Skip to content

Commit

Permalink
Implement queue
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastianrakel authored and bastelfreak committed Jun 22, 2023
1 parent 9902934 commit 08c773b
Show file tree
Hide file tree
Showing 11 changed files with 237 additions and 13 deletions.
27 changes: 26 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ server:
enabled: false
certificate: "/path/to/tls/certificate"
key: "/path/to/tls/key"
queue:
enabled: true
max_concurrent_jobs: 10
max_history_items: 20
chatops:
enabled: false
service: slack
Expand Down Expand Up @@ -107,6 +111,28 @@ Type: string
Description: Full path to key file. Optional.
Default: `nil`

#### `queue`

Type: struct
Description: Struct containing Queue options

##### `enabled`

Type: bool
Description: Should queuing be used
Default: `false`

##### `max_concurrent_jobs`

Type: int
Description: How many jobs could be stored in queue
Default: `10`

##### `max_history_items`
Type: int
Description: How many queue items should be stored in the history
Default: `50`

### ChatOps options

#### `enabled`
Expand Down Expand Up @@ -188,4 +214,3 @@ Default: `true`
Type: bool
Description: Run `puppet generate types` after updating an environment
Default: `true`

20 changes: 16 additions & 4 deletions api/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/voxpupuli/webhook-go/config"
"github.com/voxpupuli/webhook-go/lib/helpers"
"github.com/voxpupuli/webhook-go/lib/parsers"
"github.com/voxpupuli/webhook-go/lib/queue"
)

// Environment Controller
Expand All @@ -29,7 +30,7 @@ func (e EnvironmentController) DeployEnvironment(c *gin.Context) {
conf := config.GetConfig()

// Setup chatops connection so we don't have to repeat the process
conn := chatopsSetup()
conn := helpers.ChatopsSetup()

// Parse the data from the request and error if the parsing fails
err := data.ParseData(c)
Expand Down Expand Up @@ -76,19 +77,30 @@ func (e EnvironmentController) DeployEnvironment(c *gin.Context) {
//
// On success this will:
// * Respond with an HTTP 202 and the result in JSON format
res, err := execute(cmd)
if err != nil {
log.Errorf("failed to execute local command `%s` with error: `%s` `%s`", cmd, err, res)

var res interface{}
if conf.Server.Queue.Enabled {
res, err = queue.AddToQueue("env", env, cmd)
} else {
res, err = helpers.Execute(cmd)

if err != nil {
log.Errorf("failed to execute local command `%s` with error: `%s` `%s`", cmd, err, res)
}
}

if err != nil {
c.JSON(http.StatusInternalServerError, res)
c.Abort()
if conf.ChatOps.Enabled {
conn.PostMessage(http.StatusInternalServerError, env)
}
return
}

c.JSON(http.StatusAccepted, res)
if conf.ChatOps.Enabled {
conn.PostMessage(http.StatusAccepted, env)
}

}
18 changes: 14 additions & 4 deletions api/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/voxpupuli/webhook-go/config"
"github.com/voxpupuli/webhook-go/lib/helpers"
"github.com/voxpupuli/webhook-go/lib/parsers"
"github.com/voxpupuli/webhook-go/lib/queue"
)

// Module Controller
Expand All @@ -28,7 +29,7 @@ func (m ModuleController) DeployModule(c *gin.Context) {
conf := config.GetConfig()

// Setup chatops connection so we don't have to repeat the process
conn := chatopsSetup()
conn := helpers.ChatopsSetup()

// Parse the data from the request and error if parsing fails
err := data.ParseData(c)
Expand Down Expand Up @@ -61,17 +62,26 @@ func (m ModuleController) DeployModule(c *gin.Context) {
//
// On success this will:
// * Respond with an HTTP 202 and the result in JSON format
res, err := execute(cmd)
if err != nil {
log.Errorf("failed to execute local command `%s` with error: `%s` `%s`", cmd, err, res)
var res interface{}
if conf.Server.Queue.Enabled {
res, err = queue.AddToQueue("module", data.ModuleName, cmd)
} else {
res, err = helpers.Execute(cmd)

if err != nil {
log.Errorf("failed to execute local command `%s` with error: `%s` `%s`", cmd, err, res)
}
}

if err != nil {
c.JSON(http.StatusInternalServerError, res)
c.Abort()
if conf.ChatOps.Enabled {
conn.PostMessage(http.StatusInternalServerError, data.ModuleName)
}
return
}

c.JSON(http.StatusAccepted, res)
if conf.ChatOps.Enabled {
conn.PostMessage(http.StatusAccepted, data.ModuleName)
Expand Down
16 changes: 16 additions & 0 deletions api/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package api

import (
"net/http"

"github.com/gin-gonic/gin"
"github.com/voxpupuli/webhook-go/lib/queue"
)

// Queue Controller
type QueueController struct{}

// QueueStatus takes in the current Gin context and show the current queue status
func (q QueueController) QueueStatus(c *gin.Context) {
c.JSON(http.StatusOK, queue.GetQueueItems())
}
7 changes: 7 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ type Config struct {
Certificate string `mapstructure:"certificate"`
Key string `mapstructure:"key"`
} `mapstructure:"tls"`
Queue struct {
Enabled bool `mapstructure:"enabled"`
MaxConcurrentJobs int `mapstructure:"max_concurrent_jobs"`
MaxHistoryItems int `mapstructure:"max_history_items"`
} `mapstructure:"queue"`
} `mapstructure:"server"`
ChatOps struct {
Enabled bool `mapstructure:"enabled"`
Expand Down Expand Up @@ -72,6 +77,8 @@ func setDefaults(v *viper.Viper) *viper.Viper {
v.SetDefault("server.port", 4000)
v.SetDefault("server.protected", false)
v.SetDefault("server.tls_enabled", false)
v.SetDefault("server.queue.max_concurrent_jobs", 10)
v.SetDefault("server.queue.max_history_items", 50)
v.SetDefault("chatops.enabled", false)
v.SetDefault("r10k.command_path", "/opt/puppetlabs/puppetserver/bin/r10k")
v.SetDefault("r10k.config_path", "/etc/puppetlabs/r10k/r10k.yaml")
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/go-playground/webhooks/v6 v6.1.0
github.com/google/go-github/v39 v39.2.0
github.com/mcdafydd/go-azuredevops v0.12.1
github.com/google/uuid v1.1.2
github.com/pandatix/gocket-chat v0.1.0-alpha
github.com/proclaim/mock-slack v0.0.0-20201019114328-0aae156a5005
github.com/sirupsen/logrus v1.9.3
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ github.com/google/pprof v0.0.0-20201023163331-3e6fc7fc9c4c/go.mod h1:kpwsk12EmLe
github.com/google/pprof v0.0.0-20201203190320-1bf35d6f28c2/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/pprof v0.0.0-20201218002935-b9804c9f04c2/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
Expand Down
16 changes: 12 additions & 4 deletions api/functions.go → lib/helpers/execute.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package api
package helpers

import (
"os/exec"
Expand All @@ -7,7 +7,7 @@ import (
"github.com/voxpupuli/webhook-go/lib/chatops"
)

func chatopsSetup() *chatops.ChatOps {
func ChatopsSetup() *chatops.ChatOps {
conf := config.GetConfig().ChatOps
c := chatops.ChatOps{
Service: conf.Service,
Expand All @@ -21,7 +21,15 @@ func chatopsSetup() *chatops.ChatOps {
}

// This returns an interface of the result of the execution and an error
func execute(cmd []string) (interface{}, error) {
func Execute(cmd []string) (interface{}, error) {
var res interface{}
var err error

res, err = localExec(cmd)
return res, err
}

func localExec(cmd []string) (string, error) {
args := cmd[1:]
command := exec.Command(cmd[0], args...)

Expand All @@ -30,5 +38,5 @@ func execute(cmd []string) (interface{}, error) {
return string(res), err
}

return res, nil
return string(res), nil
}
132 changes: 132 additions & 0 deletions lib/queue/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package queue

import (
"fmt"
"net/http"
"sync"
"time"

log "github.com/sirupsen/logrus"

"github.com/google/uuid"
"github.com/voxpupuli/webhook-go/config"
"github.com/voxpupuli/webhook-go/lib/helpers"
)

type Queue struct {
Items []*QueueItem
wg sync.WaitGroup
jobChan chan *QueueItem
}

type QueueItem struct {
Id uuid.UUID
Name string
CommandType string
AddedAt time.Time
StartedAt time.Time
FinishedAt time.Time
Command []string
Response interface{}
State string
}

var queue = &Queue{}

func GetQueueItems() []*QueueItem {
return queue.Items
}

func AddToQueue(commandType string, name string, command []string) (*QueueItem, error) {
id, err := uuid.NewUUID()
if err != nil {
return nil, err
}

queueItem := QueueItem{
Id: id,
AddedAt: time.Now(),
Command: command,
CommandType: commandType,
Name: name,
State: "added",
}

queue.Items = append(queue.Items, &queueItem)

if !queueJob(&queueItem) {
queueItem.State = "queue full"
return &queueItem, fmt.Errorf("queue is full")
}

trimItems()

return &queueItem, nil
}

func trimItems() {
conf := config.GetConfig()

if conf.Server.Queue.MaxHistoryItems == 0 {
return
}

if len(queue.Items) > conf.Server.Queue.MaxHistoryItems {
queue.Items = queue.Items[:conf.Server.Queue.MaxHistoryItems]
}
}

func Work() error {
conf := config.GetConfig()
log.Printf("start queue with %d jobs", conf.Server.Queue.MaxConcurrentJobs)

queue.jobChan = make(chan *QueueItem, conf.Server.Queue.MaxConcurrentJobs)
queue.wg.Add(1)
queue.Items = []*QueueItem{}

go worker()
return nil
}

func Dispose() {
close(queue.jobChan)
}

func queueJob(command *QueueItem) bool {
select {
case queue.jobChan <- command:
return true
default:
return false
}
}

func worker() {
defer queue.wg.Done()

log.Println("Worker is waiting for jobs")

conf := config.GetConfig()
conn := helpers.ChatopsSetup()

for job := range queue.jobChan {
log.Println("Worker picked Job", job.Id)
job.StartedAt = time.Now()

res, err := helpers.Execute(job.Command)
job.Response = res

job.FinishedAt = time.Now()
if err != nil {
log.Errorf("failed to execute local command `%s` with error: `%s` `%s`", job.Command, err, res)

if conf.ChatOps.Enabled {
conn.PostMessage(http.StatusInternalServerError, job.Name)
}
job.State = "failed"
continue
}

job.State = "success"
}
}
6 changes: 6 additions & 0 deletions server/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ func NewRouter() *gin.Engine {
environment := new(wapi.EnvironmentController)
r10k.POST("/environment", environment.DeployEnvironment)
}

queue := v1.Group("queue")
{
q := new(wapi.QueueController)
queue.GET("", q.QueueStatus)
}
}
}

Expand Down
6 changes: 6 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,17 @@ import (
"fmt"

"github.com/voxpupuli/webhook-go/config"
"github.com/voxpupuli/webhook-go/lib/queue"
)

// The Init function starts the Server on a specific port
func Init() {
config := config.GetConfig().Server

if config.Queue.Enabled {
queue.Work()
}

r := NewRouter()
if config.TLS.Enabled {
r.RunTLS(":"+fmt.Sprint(config.Port), config.TLS.Certificate, config.TLS.Key)
Expand Down

0 comments on commit 08c773b

Please sign in to comment.