Skip to content

Commit

Permalink
Extricate the Event types from history package
Browse files Browse the repository at this point in the history
Events are a thing the daemon and so on care about. But having a
database isn't, so extract the code dealing with storing history into
service/history/.
  • Loading branch information
squaremo committed Oct 18, 2017
1 parent a271a2a commit 19e4e94
Show file tree
Hide file tree
Showing 31 changed files with 215 additions and 191 deletions.
4 changes: 2 additions & 2 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"

"github.com/weaveworks/flux"
"github.com/weaveworks/flux/history"
"github.com/weaveworks/flux/event"
"github.com/weaveworks/flux/job"
"github.com/weaveworks/flux/policy"
"github.com/weaveworks/flux/remote"
Expand All @@ -29,5 +29,5 @@ type Client interface {
type Upstream interface {
RegisterDaemon(context.Context, remote.Platform) error
IsDaemonConnected(context.Context) error
LogEvent(context.Context, history.Event) error
LogEvent(context.Context, event.Event) error
}
6 changes: 3 additions & 3 deletions cmd/fluxctl/await.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"time"

"github.com/weaveworks/flux/api"
"github.com/weaveworks/flux/event"
"github.com/weaveworks/flux/git"
"github.com/weaveworks/flux/history"
"github.com/weaveworks/flux/job"
"github.com/weaveworks/flux/update"
)
Expand Down Expand Up @@ -46,8 +46,8 @@ func await(ctx context.Context, stdout, stderr io.Writer, client api.Client, job
}

// await polls for a job to have been completed, with exponential backoff.
func awaitJob(ctx context.Context, client api.Client, jobID job.ID) (history.CommitEventMetadata, error) {
var result history.CommitEventMetadata
func awaitJob(ctx context.Context, client api.Client, jobID job.ID) (event.CommitEventMetadata, error) {
var result event.CommitEventMetadata
err := backoff(100*time.Millisecond, 2, 50, 1*time.Minute, func() (bool, error) {
j, err := client.JobStatus(ctx, jobID)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions cmd/fluxd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import (
"github.com/weaveworks/flux/cluster"
"github.com/weaveworks/flux/cluster/kubernetes"
"github.com/weaveworks/flux/daemon"
"github.com/weaveworks/flux/event"
"github.com/weaveworks/flux/git"
"github.com/weaveworks/flux/history"
transport "github.com/weaveworks/flux/http"
daemonhttp "github.com/weaveworks/flux/http/daemon"
"github.com/weaveworks/flux/job"
Expand Down Expand Up @@ -298,7 +298,7 @@ func main() {

daemonRef := daemon.NewRef(notReadyDaemon)

var eventWriter history.EventWriter
var eventWriter event.EventWriter
{
// Connect to fluxsvc if given an upstream address
if *upstreamURL != "" {
Expand Down
9 changes: 5 additions & 4 deletions cmd/fluxsvc/fluxsvc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@ import (

"github.com/weaveworks/flux"
"github.com/weaveworks/flux/db"
"github.com/weaveworks/flux/event"
"github.com/weaveworks/flux/guid"
"github.com/weaveworks/flux/history"
historysql "github.com/weaveworks/flux/history/sql"
transport "github.com/weaveworks/flux/http"
"github.com/weaveworks/flux/http/client"
httpdaemon "github.com/weaveworks/flux/http/daemon"
Expand All @@ -27,6 +26,8 @@ import (
"github.com/weaveworks/flux/service"
"github.com/weaveworks/flux/service/bus"
"github.com/weaveworks/flux/service/bus/nats"
"github.com/weaveworks/flux/service/history"
historysql "github.com/weaveworks/flux/service/history/sql"
httpserver "github.com/weaveworks/flux/service/http"
"github.com/weaveworks/flux/service/instance"
instancedb "github.com/weaveworks/flux/service/instance/sql"
Expand Down Expand Up @@ -291,8 +292,8 @@ func TestFluxsvc_History(t *testing.T) {

ctx := context.Background()

err := apiClient.LogEvent(ctx, history.Event{
Type: history.EventLock,
err := apiClient.LogEvent(ctx, event.Event{
Type: event.EventLock,
ServiceIDs: []flux.ResourceID{
flux.MustParseResourceID(helloWorldSvc),
},
Expand Down
4 changes: 2 additions & 2 deletions cmd/fluxsvc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ import (
"github.com/spf13/pflag"

"github.com/weaveworks/flux/db"
"github.com/weaveworks/flux/history"
historysql "github.com/weaveworks/flux/history/sql"
"github.com/weaveworks/flux/service/bus"
"github.com/weaveworks/flux/service/bus/nats"
"github.com/weaveworks/flux/service/history"
historysql "github.com/weaveworks/flux/service/history/sql"
httpserver "github.com/weaveworks/flux/service/http"
"github.com/weaveworks/flux/service/instance"
instancedb "github.com/weaveworks/flux/service/instance/sql"
Expand Down
44 changes: 22 additions & 22 deletions daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import (

"github.com/weaveworks/flux"
"github.com/weaveworks/flux/cluster"
"github.com/weaveworks/flux/event"
"github.com/weaveworks/flux/git"
"github.com/weaveworks/flux/guid"
"github.com/weaveworks/flux/history"
"github.com/weaveworks/flux/job"
"github.com/weaveworks/flux/policy"
"github.com/weaveworks/flux/registry"
Expand Down Expand Up @@ -43,7 +43,7 @@ type Daemon struct {
Checkout *git.Checkout
Jobs *job.Queue
JobStatusCache *job.StatusCache
EventWriter history.EventWriter
EventWriter event.EventWriter
Logger log.Logger
// bookkeeping
*LoopVars
Expand Down Expand Up @@ -129,7 +129,7 @@ func (d *Daemon) ListImages(ctx context.Context, spec update.ResourceSpec) ([]fl
// Let's use the CommitEventMetadata as a convenient transport for the
// results of a job; if no commit was made (e.g., if it was a dry
// run), leave the revision field empty.
type DaemonJobFunc func(ctx context.Context, jobID job.ID, working *git.Checkout, logger log.Logger) (*history.CommitEventMetadata, error)
type DaemonJobFunc func(ctx context.Context, jobID job.ID, working *git.Checkout, logger log.Logger) (*event.CommitEventMetadata, error)

// Must cancel the context once this job is complete
func (d *Daemon) queueJob(do DaemonJobFunc) job.ID {
Expand Down Expand Up @@ -165,12 +165,12 @@ func (d *Daemon) queueJob(do DaemonJobFunc) job.ID {
serviceIDs = append(serviceIDs, id)
}
}
return d.LogEvent(history.Event{
return d.LogEvent(event.Event{
ServiceIDs: serviceIDs,
Type: history.EventCommit,
Type: event.EventCommit,
StartedAt: started,
EndedAt: started,
LogLevel: history.LogLevelInfo,
LogLevel: event.LogLevelInfo,
Metadata: metadata,
})
}
Expand Down Expand Up @@ -199,10 +199,10 @@ func (d *Daemon) UpdateManifests(ctx context.Context, spec update.Spec) (job.ID,
}

func (d *Daemon) updatePolicy(spec update.Spec, updates policy.Updates) DaemonJobFunc {
return func(ctx context.Context, jobID job.ID, working *git.Checkout, logger log.Logger) (*history.CommitEventMetadata, error) {
return func(ctx context.Context, jobID job.ID, working *git.Checkout, logger log.Logger) (*event.CommitEventMetadata, error) {
// For each update
var serviceIDs []flux.ResourceID
metadata := &history.CommitEventMetadata{
metadata := &event.CommitEventMetadata{
Spec: &spec,
Result: update.Result{},
}
Expand Down Expand Up @@ -280,7 +280,7 @@ func (d *Daemon) updatePolicy(spec update.Spec, updates policy.Updates) DaemonJo
}

func (d *Daemon) release(spec update.Spec, c release.Changes) DaemonJobFunc {
return func(ctx context.Context, jobID job.ID, working *git.Checkout, logger log.Logger) (*history.CommitEventMetadata, error) {
return func(ctx context.Context, jobID job.ID, working *git.Checkout, logger log.Logger) (*event.CommitEventMetadata, error) {
rc := release.NewReleaseContext(d.Cluster, d.Manifests, d.Registry, working)
result, err := release.Release(rc, c, logger)
if err != nil {
Expand Down Expand Up @@ -310,7 +310,7 @@ func (d *Daemon) release(spec update.Spec, c release.Changes) DaemonJobFunc {
return nil, err
}
}
return &history.CommitEventMetadata{
return &event.CommitEventMetadata{
Revision: revision,
Spec: &spec,
Result: result,
Expand Down Expand Up @@ -354,7 +354,7 @@ func (d *Daemon) JobStatus(ctx context.Context, jobID job.ID) (job.Status, error
if note != nil && note.JobID == jobID {
return job.Status{
StatusString: job.StatusSucceeded,
Result: history.CommitEventMetadata{
Result: event.CommitEventMetadata{
Revision: commit.Revision,
Spec: &note.Spec,
Result: note.Result,
Expand Down Expand Up @@ -403,7 +403,7 @@ func unknownJobError(id job.ID) error {
return fmt.Errorf("unknown job %q", string(id))
}

func (d *Daemon) LogEvent(ev history.Event) error {
func (d *Daemon) LogEvent(ev event.Event) error {
if d.EventWriter == nil {
d.Logger.Log("event", ev, "logupstream", "false")
return nil
Expand Down Expand Up @@ -467,18 +467,18 @@ func policyCommitMessage(us policy.Updates, cause update.Cause) string {
// policyEvents builds a map of events (by type), for all the events in this set of
// updates. There will be one event per type, containing all service ids
// affected by that event. e.g. all automated services will share an event.
func policyEvents(us policy.Updates, now time.Time) map[string]history.Event {
eventsByType := map[string]history.Event{}
func policyEvents(us policy.Updates, now time.Time) map[string]event.Event {
eventsByType := map[string]event.Event{}
for serviceID, update := range us {
for _, eventType := range policyEventTypes(update) {
e, ok := eventsByType[eventType]
if !ok {
e = history.Event{
e = event.Event{
ServiceIDs: []flux.ResourceID{},
Type: eventType,
StartedAt: now,
EndedAt: now,
LogLevel: history.LogLevelInfo,
LogLevel: event.LogLevelInfo,
}
}
e.ServiceIDs = append(e.ServiceIDs, serviceID)
Expand All @@ -494,22 +494,22 @@ func policyEventTypes(u policy.Update) []string {
for p, _ := range u.Add {
switch {
case p == policy.Automated:
types[history.EventAutomate] = struct{}{}
types[event.EventAutomate] = struct{}{}
case p == policy.Locked:
types[history.EventLock] = struct{}{}
types[event.EventLock] = struct{}{}
default:
types[history.EventUpdatePolicy] = struct{}{}
types[event.EventUpdatePolicy] = struct{}{}
}
}

for p, _ := range u.Remove {
switch {
case p == policy.Automated:
types[history.EventDeautomate] = struct{}{}
types[event.EventDeautomate] = struct{}{}
case p == policy.Locked:
types[history.EventUnlock] = struct{}{}
types[event.EventUnlock] = struct{}{}
default:
types[history.EventUpdatePolicy] = struct{}{}
types[event.EventUpdatePolicy] = struct{}{}
}
}
var result []string
Expand Down
30 changes: 24 additions & 6 deletions daemon/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ import (
"github.com/weaveworks/flux/cluster/kubernetes"
kresource "github.com/weaveworks/flux/cluster/kubernetes/resource"
"github.com/weaveworks/flux/cluster/kubernetes/testfiles"
"github.com/weaveworks/flux/event"
"github.com/weaveworks/flux/git"
"github.com/weaveworks/flux/git/gittest"
"github.com/weaveworks/flux/history"
"github.com/weaveworks/flux/job"
"github.com/weaveworks/flux/policy"
"github.com/weaveworks/flux/registry"
Expand Down Expand Up @@ -189,15 +189,15 @@ func TestDaemon_SyncNotify(t *testing.T) {
}

// Check that history was written to
var e []history.Event
var e []event.Event
w.Eventually(func() bool {
e, _ = events.AllEvents(time.Time{}, -1, time.Time{})
return len(e) > 0
}, "Waiting for new events")
if 1 != len(e) {
t.Fatal("Expected one log event from the sync, but got", len(e))
} else if history.EventSync != e[0].Type {
t.Fatalf("Expected event with type %s but got %s", history.EventSync, e[0].Type)
} else if event.EventSync != e[0].Type {
t.Fatalf("Expected event with type %s but got %s", event.EventSync, e[0].Type)
}
}

Expand Down Expand Up @@ -319,7 +319,7 @@ func TestDaemon_JobStatusWithNoCache(t *testing.T) {
w.ForJobSucceeded(d, id)
}

func mockDaemon(t *testing.T) (*Daemon, func(), *cluster.Mock, history.EventReadWriter) {
func mockDaemon(t *testing.T) (*Daemon, func(), *cluster.Mock, *mockEventWriter) {
logger := log.NewNopLogger()

singleService := cluster.Controller{
Expand Down Expand Up @@ -405,7 +405,7 @@ func mockDaemon(t *testing.T) (*Daemon, func(), *cluster.Mock, history.EventRead
}, nil)
}

events := history.NewMock()
events := &mockEventWriter{}

// Shutdown chans and waitgroups
shutdown := make(chan struct{})
Expand Down Expand Up @@ -439,6 +439,24 @@ func mockDaemon(t *testing.T) (*Daemon, func(), *cluster.Mock, history.EventRead
}, k8s, events
}

type mockEventWriter struct {
events []event.Event
sync.Mutex
}

func (w *mockEventWriter) LogEvent(e event.Event) error {
w.Lock()
defer w.Unlock()
w.events = append(w.events, e)
return nil
}

func (w *mockEventWriter) AllEvents(_ time.Time, _ int64, _ time.Time) ([]event.Event, error) {
w.Lock()
defer w.Unlock()
return w.events, nil
}

// DAEMON TEST HELPERS
type wait struct {
t *testing.T
Expand Down
Loading

0 comments on commit 19e4e94

Please sign in to comment.