Skip to content

Commit

Permalink
Attempt to re-register client if first connect attempt fails (#55)
Browse files Browse the repository at this point in the history
  • Loading branch information
streamer45 authored May 2, 2022
1 parent f252b1e commit db8227d
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 31 deletions.
17 changes: 3 additions & 14 deletions server/activate.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"time"

"github.com/mattermost/rtcd/logger"
rtcd "github.com/mattermost/rtcd/service"
"github.com/mattermost/rtcd/service/rtc"

pluginapi "github.com/mattermost/mattermost-plugin-api"
Expand Down Expand Up @@ -42,25 +41,15 @@ func (p *Plugin) OnActivate() error {
}

if cfg.RTCDServiceURL != "" {
clientCfg, err := p.getRTCDClientConfig(cfg.RTCDServiceURL)
if err != nil {
err = fmt.Errorf("failed to get rtcd client config: %w", err)
p.LogError(err.Error())
return err
}

client, err := rtcd.NewClient(clientCfg)
client, err := p.newRTCDClient(cfg.RTCDServiceURL)
if err != nil {
err = fmt.Errorf("failed to create rtcd client: %w", err)
p.LogError(err.Error())
return err
}

if err := client.Connect(); err != nil {
err = fmt.Errorf("failed to connect rtcd client: %w", err)
p.LogError(err.Error())
return err
}
p.LogDebug("rtcd client connected successfully")

p.rtcdClient = client
go func() {
for err := range p.rtcdClient.ErrorCh() {
Expand Down
97 changes: 80 additions & 17 deletions server/rtcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,85 @@ func (p *Plugin) getStoredRTCDConfig() (rtcd.ClientConfig, error) {
return cfg, nil
}

func (p *Plugin) registerRTCDClient(cfg rtcd.ClientConfig) (rtcd.ClientConfig, error) {
client, err := rtcd.NewClient(cfg)
if err != nil {
return cfg, fmt.Errorf("failed to create rtcd client: %w", err)
}
defer client.Close()

cfg.AuthKey, err = client.Register(cfg.ClientID)
if err != nil {
return cfg, fmt.Errorf("failed to register rtcd client: %w", err)
}

cfgData, err := json.Marshal(&cfg)
if err != nil {
return cfg, fmt.Errorf("failed to marshal rtcd client config: %w", err)
}

if err := p.API.KVSet(rtcdConfigKey, cfgData); err != nil {
return cfg, fmt.Errorf("failed to store rtcd client config: %w", err)
}

p.LogDebug("rtcd client registered successfully", "clientID", cfg.ClientID)

return cfg, nil
}

func (p *Plugin) newRTCDClient(rtcdURL string) (*rtcd.Client, error) {
clientCfg, err := p.getRTCDClientConfig(rtcdURL)
if err != nil {
return nil, fmt.Errorf("failed to get rtcd client config: %w", err)
}

client, err := rtcd.NewClient(clientCfg)
if err != nil {
return nil, fmt.Errorf("failed to create rtcd client: %w", err)
}

err = client.Connect()
if err == nil {
return client, nil
}
defer client.Close()

// If connecting fails we attempt to re-register once as the rtcd instance may
// have restarted, potentially losing stored credentials.

p.LogError(fmt.Sprintf("failed to connect rtcd client: %s", err.Error()))
p.LogDebug("attempting to re-register the rtcd client")

mutex, err := cluster.NewMutex(p.API, "rtcd_registration")
if err != nil {
return nil, fmt.Errorf("failed to create cluster mutex: %w", err)
}

lockCtx, cancelCtx := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelCtx()
if err := mutex.LockWithContext(lockCtx); err != nil {
return nil, fmt.Errorf("failed to acquire cluster lock: %w", err)
}
defer mutex.Unlock()

newCfg, err := p.registerRTCDClient(clientCfg)
if err != nil {
client.Close()
return nil, fmt.Errorf("failed to register rtcd client: %w", err)
}

newClient, err := rtcd.NewClient(newCfg)
if err != nil {
return nil, fmt.Errorf("failed to create rtcd client: %w", err)
}

if err := newClient.Connect(); err != nil {
return nil, fmt.Errorf("failed to connect rtcd client: %w", err)
}

return newClient, nil
}

func (p *Plugin) getRTCDClientConfig(rtcdURL string) (rtcd.ClientConfig, error) {
var cfg rtcd.ClientConfig

Expand Down Expand Up @@ -69,12 +148,6 @@ func (p *Plugin) getRTCDClientConfig(rtcdURL string) (rtcd.ClientConfig, error)
return cfg, nil
}

client, err := rtcd.NewClient(cfg)
if err != nil {
return cfg, fmt.Errorf("failed to create rtcd client: %w", err)
}
defer client.Close()

// Here we need some coordination to avoid multiple plugin instances to
// register at the same time (at most one would succeed).
mutex, err := cluster.NewMutex(p.API, "rtcd_registration")
Expand All @@ -95,19 +168,9 @@ func (p *Plugin) getRTCDClientConfig(rtcdURL string) (rtcd.ClientConfig, error)
return storedCfg, nil
}

cfg.AuthKey, err = client.Register(cfg.ClientID)
if err != nil {
if cfg, err := p.registerRTCDClient(cfg); err != nil {
return cfg, fmt.Errorf("failed to register rtcd client: %w", err)
}

cfgData, err := json.Marshal(&cfg)
if err != nil {
return cfg, fmt.Errorf("failed to marshal rtcd client config: %w", err)
}

if err := p.API.KVSet(rtcdConfigKey, cfgData); err != nil {
return cfg, fmt.Errorf("failed to store rtcd client config: %w", err)
}

return cfg, nil
}

0 comments on commit db8227d

Please sign in to comment.