Skip to content

Commit

Permalink
Refactor job controller and websocket logic. (#150)
Browse files Browse the repository at this point in the history
  • Loading branch information
nishkrishnan authored Dec 2, 2021
1 parent 9fd8043 commit 8a79e1b
Show file tree
Hide file tree
Showing 27 changed files with 493 additions and 1,047 deletions.
2 changes: 1 addition & 1 deletion server/controllers/events/events_controller_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ import (
"github.com/runatlantis/atlantis/server/events/webhooks"
"github.com/runatlantis/atlantis/server/events/yaml"
"github.com/runatlantis/atlantis/server/events/yaml/valid"
"github.com/runatlantis/atlantis/server/feature"
handlermocks "github.com/runatlantis/atlantis/server/handlers/mocks"
"github.com/runatlantis/atlantis/server/logging"
"github.com/runatlantis/atlantis/server/lyft/feature"
. "github.com/runatlantis/atlantis/testing"
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ import (
"strconv"

"github.com/gorilla/mux"
"github.com/gorilla/websocket"
stats "github.com/lyft/gostats"
"github.com/pkg/errors"
"github.com/runatlantis/atlantis/server/controllers/templates"
"github.com/runatlantis/atlantis/server/controllers/websocket"
"github.com/runatlantis/atlantis/server/core/db"
"github.com/runatlantis/atlantis/server/events/metrics"
"github.com/runatlantis/atlantis/server/events/models"
"github.com/runatlantis/atlantis/server/handlers"
"github.com/runatlantis/atlantis/server/logging"
)

Expand All @@ -25,10 +25,20 @@ type JobsController struct {
ProjectJobsTemplate templates.TemplateWriter
ProjectJobsErrorTemplate templates.TemplateWriter
Db *db.BoltDB
WsMux *websocket.Multiplexor
StatsScope stats.Scope
}

type ProjectInfoKeyGenerator struct{}

func (g ProjectInfoKeyGenerator) Generate(r *http.Request) (string, error) {
projectInfo, err := newProjectInfo(r)

if err != nil {
return "", errors.Wrap(err, "creating project info")
}

WebsocketHandler handlers.WebsocketHandler
ProjectCommandOutputHandler handlers.ProjectCommandOutputHandler
StatsScope stats.Scope
return projectInfo.String(), nil
}

type pullInfo struct {
Expand Down Expand Up @@ -132,37 +142,9 @@ func (j *JobsController) GetProjectJobs(w http.ResponseWriter, r *http.Request)
}

func (j *JobsController) getProjectJobsWS(w http.ResponseWriter, r *http.Request) error {
projectInfo, err := newProjectInfo(r)
if err != nil {
j.respond(w, logging.Error, http.StatusInternalServerError, err.Error())
return err
}

c, err := j.WebsocketHandler.Upgrade(w, r, nil)
if err != nil {
j.Logger.Warn("Failed to upgrade websocket: %s", err)
return err
}

// Buffer size set to 1000 to ensure messages get queued (upto 1000) if the receiverCh is not ready to
// receive messages before the channel is closed and resources cleaned up.
receiver := make(chan string, 1000)
j.WebsocketHandler.SetCloseHandler(c, receiver)

// Add a reader goroutine to listen for socket.close() events.
go j.WebsocketHandler.SetReadHandler(c)

pull := projectInfo.String()
err = j.ProjectCommandOutputHandler.Receive(pull, receiver, func(msg string) error {
if err := c.WriteMessage(websocket.BinaryMessage, []byte("\r"+msg+"\n")); err != nil {
j.Logger.Warn("Failed to write ws message: %s", err)
return err
}
return nil
})
err := j.WsMux.Handle(w, r)

if err != nil {
j.Logger.Warn("Failed to receive message: %s", err)
j.respond(w, logging.Error, http.StatusInternalServerError, err.Error())
return err
}
Expand Down
47 changes: 0 additions & 47 deletions server/controllers/logstreaming_controller_test.go

This file was deleted.

63 changes: 63 additions & 0 deletions server/controllers/websocket/mux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package websocket

import (
"net/http"

"github.com/gorilla/websocket"
"github.com/pkg/errors"
"github.com/runatlantis/atlantis/server/logging"
)

// PartitionKeyGenerator generates partition keys for the multiplexor
type PartitionKeyGenerator interface {
Generate(r *http.Request) (string, error)
}

// PartitionRegistry is the registry holding each partition
// and is responsible for registering/deregistering new buffers
type PartitionRegistry interface {
Register(key string, buffer chan string)
Deregister(key string, buffer chan string)
}

// Multiplexor is responsible for handling the data transfer between the storage layer
// and the registry. Note this is still a WIP as right now the registry is assumed to handle
// everything.
type Multiplexor struct {
writer *Writer
keyGenerator PartitionKeyGenerator
registry PartitionRegistry
}

func NewMultiplexor(log logging.SimpleLogging, keyGenerator PartitionKeyGenerator, registry PartitionRegistry) *Multiplexor {
upgrader := websocket.Upgrader{}
upgrader.CheckOrigin = func(r *http.Request) bool { return true }
return &Multiplexor{
writer: &Writer{
upgrader: upgrader,
log: log,
},
keyGenerator: keyGenerator,
registry: registry,
}
}

// Handle should be called for a given websocket request. It blocks
// while writing to the websocket until the buffer is closed.
func (m *Multiplexor) Handle(w http.ResponseWriter, r *http.Request) error {
key, err := m.keyGenerator.Generate(r)

if err != nil {
return errors.Wrapf(err, "generating partition key")
}

// Buffer size set to 1000 to ensure messages get queued.
// TODO: make buffer size configurable
buffer := make(chan string, 1000)

// spinning up a goroutine for this since we are attempting to block on the read side.
go m.registry.Register(key, buffer)
defer m.registry.Deregister(key, buffer)

return errors.Wrapf(m.writer.Write(w, r, buffer), "writing to ws %s", key)
}
70 changes: 70 additions & 0 deletions server/controllers/websocket/writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package websocket

import (
"net/http"

"github.com/gorilla/websocket"
"github.com/pkg/errors"
"github.com/runatlantis/atlantis/server/logging"
)

func NewWriter(log logging.SimpleLogging) *Writer {
upgrader := websocket.Upgrader{}
upgrader.CheckOrigin = func(r *http.Request) bool { return true }
return &Writer{
upgrader: upgrader,
log: log,
}
}

type Writer struct {
upgrader websocket.Upgrader

//TODO: Remove dependency on atlantis logger here if we upstream this.
log logging.SimpleLogging
}

func (w *Writer) Write(rw http.ResponseWriter, r *http.Request, input chan string) error {
conn, err := w.upgrader.Upgrade(rw, r, nil)

if err != nil {
return errors.Wrap(err, "upgrading websocket connection")
}

conn.SetCloseHandler(func(code int, text string) error {
// Close the channnel after websocket connection closed.
// Will gracefully exit the ProjectCommandOutputHandler.Register() call and cleanup.
// is it good practice to close at the receiver? Probably not, we should figure out a better
// way to handle this case
close(input)
return nil
})

// Add a reader goroutine to listen for socket.close() events.
go w.setReadHandler(conn)

// block on reading our input channel
for msg := range input {
if err := conn.WriteMessage(websocket.BinaryMessage, []byte("\r"+msg+"\n")); err != nil {
w.log.Warn("Failed to write ws message: %s", err)
return err
}
}

return nil
}

func (w *Writer) setReadHandler(c *websocket.Conn) {
for {
_, _, err := c.ReadMessage()
if err != nil {
// CloseGoingAway (1001) when a browser tab is closed.
// Expected behaviour since we have a CloseHandler(), log warning if not a CloseGoingAway
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) {
w.log.Warn("Failed to read WS message: %s", err)
}
return
}
}

}
2 changes: 1 addition & 1 deletion server/core/terraform/terraform_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ import (

"github.com/runatlantis/atlantis/server/events/models"
"github.com/runatlantis/atlantis/server/events/terraform/ansi"
"github.com/runatlantis/atlantis/server/feature"
"github.com/runatlantis/atlantis/server/handlers"
"github.com/runatlantis/atlantis/server/logging"
"github.com/runatlantis/atlantis/server/lyft/feature"
)

var LogStreamingValidCmds = [...]string{"init", "plan", "apply"}
Expand Down
4 changes: 2 additions & 2 deletions server/core/terraform/terraform_client_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ import (
version "github.com/hashicorp/go-version"
. "github.com/petergtz/pegomock"
"github.com/runatlantis/atlantis/server/events/models"
"github.com/runatlantis/atlantis/server/feature"
fmocks "github.com/runatlantis/atlantis/server/feature/mocks"
handlermocks "github.com/runatlantis/atlantis/server/handlers/mocks"
"github.com/runatlantis/atlantis/server/logging"
"github.com/runatlantis/atlantis/server/lyft/feature"
fmocks "github.com/runatlantis/atlantis/server/lyft/feature/mocks"
. "github.com/runatlantis/atlantis/testing"
)

Expand Down
4 changes: 2 additions & 2 deletions server/core/terraform/terraform_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ import (
"github.com/runatlantis/atlantis/server/core/terraform"
"github.com/runatlantis/atlantis/server/core/terraform/mocks"
"github.com/runatlantis/atlantis/server/events/models"
"github.com/runatlantis/atlantis/server/feature"
fmocks "github.com/runatlantis/atlantis/server/feature/mocks"
handlermocks "github.com/runatlantis/atlantis/server/handlers/mocks"
"github.com/runatlantis/atlantis/server/logging"
"github.com/runatlantis/atlantis/server/lyft/feature"
fmocks "github.com/runatlantis/atlantis/server/lyft/feature/mocks"
. "github.com/runatlantis/atlantis/testing"
)

Expand Down
2 changes: 1 addition & 1 deletion server/events/command_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import (
"github.com/runatlantis/atlantis/server/events/models"
"github.com/runatlantis/atlantis/server/events/vcs"
"github.com/runatlantis/atlantis/server/events/yaml/valid"
"github.com/runatlantis/atlantis/server/feature"
"github.com/runatlantis/atlantis/server/logging"
"github.com/runatlantis/atlantis/server/lyft/feature"
"github.com/runatlantis/atlantis/server/recovery"
gitlab "github.com/xanzy/go-gitlab"
)
Expand Down
4 changes: 2 additions & 2 deletions server/events/command_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ import (
"github.com/runatlantis/atlantis/server/events/models"
"github.com/runatlantis/atlantis/server/events/models/fixtures"
vcsmocks "github.com/runatlantis/atlantis/server/events/vcs/mocks"
"github.com/runatlantis/atlantis/server/feature"
fmocks "github.com/runatlantis/atlantis/server/feature/mocks"
"github.com/runatlantis/atlantis/server/lyft/feature"
fmocks "github.com/runatlantis/atlantis/server/lyft/feature/mocks"
. "github.com/runatlantis/atlantis/testing"
)

Expand Down
2 changes: 1 addition & 1 deletion server/events/project_command_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import (
"github.com/runatlantis/atlantis/server/events/models"
"github.com/runatlantis/atlantis/server/events/webhooks"
"github.com/runatlantis/atlantis/server/events/yaml/valid"
"github.com/runatlantis/atlantis/server/feature"
"github.com/runatlantis/atlantis/server/handlers"
"github.com/runatlantis/atlantis/server/logging"
"github.com/runatlantis/atlantis/server/lyft/feature"
)

// DirNotExistErr is an error caused by the directory not existing.
Expand Down
4 changes: 2 additions & 2 deletions server/events/project_command_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ import (
"github.com/runatlantis/atlantis/server/events/mocks/matchers"
"github.com/runatlantis/atlantis/server/events/models"
"github.com/runatlantis/atlantis/server/events/yaml/valid"
"github.com/runatlantis/atlantis/server/feature"
fmocks "github.com/runatlantis/atlantis/server/feature/mocks"
handlermocks "github.com/runatlantis/atlantis/server/handlers/mocks"
"github.com/runatlantis/atlantis/server/logging"
"github.com/runatlantis/atlantis/server/lyft/feature"
fmocks "github.com/runatlantis/atlantis/server/lyft/feature/mocks"
. "github.com/runatlantis/atlantis/testing"
)

Expand Down
Loading

0 comments on commit 8a79e1b

Please sign in to comment.