Skip to content

Commit

Permalink
[MM-54232] Simplify job service logic (#501)
Browse files Browse the repository at this point in the history
* Simplify job service logic

* Tidy modules

* Update dep
  • Loading branch information
streamer45 authored Aug 31, 2023
1 parent 177eded commit a1ee1ea
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 139 deletions.
24 changes: 3 additions & 21 deletions server/activate.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,31 +114,13 @@ func (p *Plugin) OnActivate() error {
p.botSession = session

if p.licenseChecker.RecordingsAllowed() && cfg.recordingsEnabled() {
recorderVersion, ok := manifest.Props["calls_recorder_version"].(string)
if !ok {
err = fmt.Errorf("failed to get recorder version from manifest")
p.LogError(err.Error())
return err
}
recordingJobRunner = "mattermost/calls-recorder:" + recorderVersion

if err := p.initJobService(); err != nil {
err = fmt.Errorf("failed to initialize job service: %w", err)
p.LogError(err.Error())
return err
}

p.LogDebug("job service initialized successfully")

go func() {
p.LogDebug("updating job runner")
jobService := p.getJobService()
if err := jobService.UpdateJobRunner(recordingJobRunner); err != nil {
err = fmt.Errorf("failed to update job runner: %w", err)
if err := p.initJobService(); err != nil {
err = fmt.Errorf("failed to initialize job service: %w", err)
p.LogError(err.Error())
return
}
p.LogDebug("job runner updated successfully")
p.LogDebug("job service initialized successfully")
}()
}

Expand Down
141 changes: 23 additions & 118 deletions server/job_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"math/rand"
"os"
"strings"
"time"
Expand Down Expand Up @@ -143,40 +141,18 @@ func (p *Plugin) getJobServiceClientConfig(serviceURL string) (offloader.ClientC
}
}

if err := p.registerJobServiceClient(cfg); err != nil {
return cfg, fmt.Errorf("failed to register job service client: %w", err)
}

return cfg, nil
}

func (p *Plugin) registerJobServiceClient(cfg offloader.ClientConfig) error {
client, err := offloader.NewClient(cfg)
if err != nil {
return fmt.Errorf("failed to create job service client: %w", err)
}
defer client.Close()

cfgData, err := json.Marshal(&cfg)
if err != nil {
return fmt.Errorf("failed to marshal job service client config: %w", err)
}

if err := client.Register(cfg.ClientID, cfg.AuthKey); err != nil {
return fmt.Errorf("failed to register job service client: %w", err)
return cfg, fmt.Errorf("failed to marshal job service client config: %w", err)
}

// TODO: guard against the "locked out" corner case that the server/plugin process exits
// before being able to store the credentials but after a successful
// registration.
// Saving auth credentials.
p.metrics.IncStoreOp("KVSet")
if err := p.API.KVSet(jobServiceConfigKey, cfgData); err != nil {
return fmt.Errorf("failed to store job service client config: %w", err)
return cfg, fmt.Errorf("failed to store job service client config: %w", err)
}

p.LogDebug("job service client registered successfully", "clientID", cfg.ClientID)

return nil
return cfg, nil
}

func (p *Plugin) newJobService(serviceURL string) (*jobService, error) {
Expand Down Expand Up @@ -215,27 +191,6 @@ func (p *Plugin) newJobService(serviceURL string) (*jobService, error) {
return nil, err
}

err = client.Login(cfg.ClientID, cfg.AuthKey)
if err == nil {
return &jobService{
ctx: p,
client: client,
}, nil
}

// If login fails we attempt to re-register once as the jobs instance may
// have restarted, potentially losing stored credentials.
p.LogError("failed to login to job service", "err", err.Error())
p.LogDebug("attempting to re-register the job service client")

if err := p.registerJobServiceClient(cfg); err != nil {
return nil, fmt.Errorf("failed to register job service client: %w", err)
}

if err := client.Login(cfg.ClientID, cfg.AuthKey); err != nil {
return nil, fmt.Errorf("login failed: %w", err)
}

return &jobService{
ctx: p,
client: client,
Expand All @@ -257,9 +212,9 @@ func (s *jobService) StopJob(channelID string) error {
return nil
}

func (s *jobService) UpdateJobRunner(runner string) error {
func (s *jobService) Init(runner string) error {
// Here we need some coordination to avoid multiple plugin instances to
// update the runner concurrently.
// initialize the service concurrently.
mutex, err := cluster.NewMutex(s.ctx.API, s.ctx.metrics, "job_service_runner_update", cluster.MutexConfig{})
if err != nil {
return fmt.Errorf("failed to create cluster mutex: %w", err)
Expand Down Expand Up @@ -313,26 +268,7 @@ func (s *jobService) RunRecordingJob(callID, postID, recordingID, authToken stri
}

jb, err := s.client.CreateJob(jobCfg)

// Adding a check in case the service restarted and lost credentials. This is
// a common case when the offloader is running in kubernetes deployment. The
// solution is to re-initialize the service which will cause a new registration
// attempt.
// On top of that we need to implemente a re-try mechanism since each
// subsequent HTTP request could be hitting a different pod.
if errors.Is(err, offloader.ErrUnauthorized) {
data, err := s.ctx.retryJobService(func(c *offloader.Client) (any, error) {
return c.CreateJob(jobCfg)
})
if err != nil {
return "", err
}
jb, ok := data.(job.Job)
if !ok {
return "", fmt.Errorf("unexpected data found in place of job")
}
return jb.ID, nil
} else if err != nil {
if err != nil {
return "", err
}

Expand Down Expand Up @@ -374,55 +310,24 @@ func (p *Plugin) jobServiceVersionCheck(client *offloader.Client) error {

func (p *Plugin) initJobService() error {
p.LogDebug("initializing job service")
_, err := p.retryJobService(nil)
return err
}

// retryJobService couples the Register -> Login -> Action sequence in a loop
// to make sure it succeeds up to maxReinitializationAttempts.
// This is needed in Kubernetes deployments where requests are potentially load
// balanced to different pods and could fail due to the client not being
// authenticated.
// The passed callback function is used to perform arbitrary API actions that
// require the client to be successfully logged in and re-attempted upon
// failure.
func (p *Plugin) retryJobService(cb func(client *offloader.Client) (any, error)) (any, error) {
waitBeforeRetry := func(err error, attempt int) {
p.LogError(err.Error())
time.Sleep(reinitializationAttemptInterval + time.Duration(rand.Intn(1000))*time.Millisecond)
p.LogWarn("attempting job service re-initialization", "attempt", fmt.Sprintf("%d", attempt))
}

for i := 0; i < maxReinitializationAttempts; i++ {
if jobService := p.getJobService(); jobService != nil {
if err := jobService.Close(); err != nil {
p.LogError("failed to close job service client", "err", err.Error())
}
}

jobService, err := p.newJobService(p.getConfiguration().getJobServiceURL())
if err != nil {
waitBeforeRetry(fmt.Errorf("failed to create job service: %w", err), i)
continue
}

p.mut.Lock()
p.jobService = jobService
p.mut.Unlock()

p.LogInfo("job service re-initialized successfully")
recorderVersion, ok := manifest.Props["calls_recorder_version"].(string)
if !ok {
return fmt.Errorf("failed to get recorder version from manifest")
}
recordingJobRunner = "mattermost/calls-recorder:" + recorderVersion

var data any
if cb != nil {
data, err = cb(jobService.client)
if err != nil {
waitBeforeRetry(fmt.Errorf("retry callback failed: %w", err), i)
continue
}
}
jobService, err := p.newJobService(p.getConfiguration().getJobServiceURL())
if err != nil {
return fmt.Errorf("failed to create job service: %w", err)
}

return data, nil
if err := jobService.Init(recordingJobRunner); err != nil {
return err
}

return nil, fmt.Errorf("max re-initialization attempts reached")
p.mut.Lock()
p.jobService = jobService
p.mut.Unlock()

return nil
}

0 comments on commit a1ee1ea

Please sign in to comment.