Skip to content

Commit

Permalink
update TestLogManager
Browse files Browse the repository at this point in the history
  • Loading branch information
barrettj12 committed May 22, 2023
1 parent cfaa625 commit b3ed54c
Showing 1 changed file with 125 additions and 111 deletions.
236 changes: 125 additions & 111 deletions internal/overlord/logstate/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,12 @@ package logstate

import (
"bytes"
"fmt"
"sync"
"time"

"github.com/canonical/pebble/internal/logger"
"github.com/canonical/pebble/internal/plan"
"github.com/canonical/pebble/internal/servicelog"
"github.com/canonical/pebble/internal/testutil"
. "gopkg.in/check.v1"
"sync"
)

type managerSuite struct {
Expand All @@ -41,22 +39,22 @@ func (s *managerSuite) TearDownTest(c *C) {
}

func (s *managerSuite) TestLogManager(c *C) {
m := NewLogManagerForTest()
m := newLogManagerForTest()
// Fake ringbuffer so that log manager can create forwarders
rb := servicelog.RingBuffer{}

// Call PlanChanged with new plan
m.PlanChanged(&plan.Plan{
Services: map[string]*plan.Service{
"svc1": {},
"svc2": {LogTargets: []string{"optin", "disabled"}},
"svc3": {LogTargets: []string{"unset"}},
"svc2": {LogTargets: []string{"tgt3", "tgt4"}},
"svc3": {LogTargets: []string{"tgt1"}},
},
LogTargets: map[string]*plan.LogTarget{
"unset": {Name: "unset", Type: plan.LokiTarget, Selection: plan.UnsetSelection},
"optout": {Name: "optout", Type: plan.LokiTarget, Selection: plan.OptOutSelection},
"optin": {Name: "optin", Type: plan.LokiTarget, Selection: plan.OptInSelection},
"disabled": {Name: "disabled", Type: plan.LokiTarget, Selection: plan.DisabledSelection},
"tgt1": {Name: "tgt1", Type: plan.LokiTarget, Selection: plan.UnsetSelection},
"tgt2": {Name: "tgt2", Type: plan.LokiTarget, Selection: plan.OptOutSelection},
"tgt3": {Name: "tgt3", Type: plan.LokiTarget, Selection: plan.OptInSelection},
"tgt4": {Name: "tgt4", Type: plan.LokiTarget, Selection: plan.DisabledSelection},
},
})

Expand All @@ -78,125 +76,141 @@ func (s *managerSuite) TestLogManager(c *C) {
}()

wg.Wait()
c.Check(m.forwarders, HasLen, 4)
checkForwarderExists(c, m.forwarders, "svc1", "unset")
checkForwarderExists(c, m.forwarders, "svc1", "optout")
checkForwarderExists(c, m.forwarders, "svc2", "optin")
checkForwarderExists(c, m.forwarders, "svc3", "unset")
checkForwarders(c, m.forwarders, map[string][]string{
"svc1": {"tgt1", "tgt2"},
"svc2": {"tgt3"},
"svc3": {"tgt1"},
})
checkGatherers(c, m.gatherers, []string{"tgt1", "tgt2", "tgt3"})

// Update the plan
m.PlanChanged(&plan.Plan{
Services: map[string]*plan.Service{
"svc1": {},
"svc2": {LogTargets: []string{"optout", "disabled"}},
"svc4": {LogTargets: []string{"optin"}},
"svc2": {LogTargets: []string{"tgt2", "tgt4"}},
"svc4": {LogTargets: []string{"tgt3"}},
},
LogTargets: map[string]*plan.LogTarget{
"unset": {Name: "unset", Type: plan.LokiTarget, Selection: plan.UnsetSelection},
"optout": {Name: "optout", Type: plan.LokiTarget, Selection: plan.OptOutSelection},
"optin": {Name: "optin", Type: plan.LokiTarget, Selection: plan.OptInSelection},
"disabled": {Name: "disabled", Type: plan.LokiTarget, Selection: plan.DisabledSelection},
"tgt1": {Name: "tgt1", Type: plan.LokiTarget, Selection: plan.UnsetSelection},
"tgt2": {Name: "tgt2", Type: plan.LokiTarget, Selection: plan.OptOutSelection},
"tgt3": {Name: "tgt3", Type: plan.LokiTarget, Selection: plan.OptInSelection},
"tgt4": {Name: "tgt4", Type: plan.LokiTarget, Selection: plan.DisabledSelection},
},
})

// Call ServiceStarted
m.ServiceStarted("svc4", &rb)

c.Check(m.forwarders, HasLen, 4)
checkForwarderExists(c, m.forwarders, "svc1", "unset")
checkForwarderExists(c, m.forwarders, "svc1", "optout")
checkForwarderExists(c, m.forwarders, "svc2", "optout")
checkForwarderExists(c, m.forwarders, "svc4", "optin")
checkForwarders(c, m.forwarders, map[string][]string{
"svc1": {"tgt1", "tgt2"},
"svc2": {"tgt2"},
"svc4": {"tgt3"},
})
checkGatherers(c, m.gatherers, []string{"tgt1", "tgt2", "tgt3"})
}

// checkForwarderExists checks that a forwarder for the given service and
// target exists in the provided slice of forwarders.
func checkForwarderExists(c *C, forwarders []*logForwarder, serviceName, targetName string) {
for _, f := range forwarders {
if f.service == serviceName && f.target.Name == targetName {
return
// checkForwarders checks that the arrangement of forwarders -> gatherers is
// as described in the provided map.
func checkForwarders(c *C, forwarders map[string]*logForwarder, expected map[string][]string) {
c.Check(len(forwarders), Equals, len(expected))
for serviceName := range expected {
forwarder, ok := forwarders[serviceName]
c.Assert(ok, Equals, true)
c.Assert(forwarder, Not(IsNil))
//forwarder.mu.Lock() - race still passes without this
c.Check(len(forwarder.gatherers), Equals, len(expected[serviceName]))
for _, gatherer := range forwarder.gatherers {
c.Check(expected[serviceName], testutil.Contains, gatherer.target.Name)
}
//forwarder.mu.Unlock()
}
c.Errorf("no forwarder found with service: %q, target: %q", serviceName, targetName)
}

func (s *managerSuite) TestNoLogDuplication(c *C) {
// Reduce Loki flush time
flushDelayOld := flushDelay
flushDelay = 10 * time.Millisecond
defer func() {
flushDelay = flushDelayOld
}()

m := NewLogManager()
rb := servicelog.NewRingBuffer(1024)

// Set up fake "Loki" server
requests := make(chan string, 2)
srv := newFakeLokiServer(requests)
defer srv.Close()

// Utility functions for this test
writeLog := func(timestamp time.Time, logLine string) {
_, err := fmt.Fprintf(rb, "%s [svc1] %s\n",
timestamp.UTC().Format("2006-01-02T15:04:05.000Z07:00"), logLine)
c.Assert(err, IsNil)
// checkGatherers checks that the expected gatherers exist.
func checkGatherers(c *C, gatherers map[string]*logGatherer, expected []string) {
c.Check(len(gatherers), Equals, len(expected))
for targetName := range gatherers {
c.Check(expected, testutil.Contains, targetName)
}
expectLogs := func(expected string) {
select {
case req := <-requests:
c.Assert(req, Equals, expected)
case <-time.After(1 * time.Second):
c.Fatalf("timed out waiting for request %q", expected)
}
}

m.PlanChanged(&plan.Plan{
Services: map[string]*plan.Service{
"svc1": {},
},
LogTargets: map[string]*plan.LogTarget{
"unset": {
Type: plan.LokiTarget,
Location: srv.URL(),
Selection: plan.UnsetSelection,
},
},
})
m.ServiceStarted("svc1", rb)
c.Assert(m.forwarders, HasLen, 1)

// Write logs
writeLog(time.Date(2023, 1, 31, 1, 23, 45, 67890, time.UTC), "log line #1")
writeLog(time.Date(2023, 1, 31, 1, 23, 46, 67890, time.UTC), "log line #2")
expectLogs(`{"streams":[{"stream":{"pebble_service":"svc1"},"values":[["1675128225000000000","log line #1"],["1675128226000000000","log line #2"]]}]}`)

// Call PlanChanged again
m.PlanChanged(&plan.Plan{
Services: map[string]*plan.Service{
"svc1": {},
},
LogTargets: map[string]*plan.LogTarget{
"unset": {
Type: plan.LokiTarget,
Location: srv.URL(),
Selection: plan.UnsetSelection,
},
},
})
c.Check(m.forwarders, HasLen, 1)

// Write logs
writeLog(time.Date(2023, 1, 31, 1, 23, 47, 67890, time.UTC), "log line #3")
writeLog(time.Date(2023, 1, 31, 1, 23, 48, 67890, time.UTC), "log line #4")
expectLogs(`{"streams":[{"stream":{"pebble_service":"svc1"},"values":[["1675128227000000000","log line #3"],["1675128228000000000","log line #4"]]}]}`)
}

func NewLogManagerForTest() *LogManager {
return &LogManager{
forwarders: map[string]*logForwarder{},
gatherers: map[string]*logGatherer{},
newForwarder: newLogForwarderForTest,
newGatherer: newLogGathererForTest,
}
//func (s *managerSuite) TestNoLogDuplication(c *C) {
// // Reduce Loki flush time
// flushDelayOld := flushDelay
// flushDelay = 10 * time.Millisecond
// defer func() {
// flushDelay = flushDelayOld
// }()
//
// m := NewLogManager()
// rb := servicelog.NewRingBuffer(1024)
//
// // Set up fake "Loki" server
// requests := make(chan string, 2)
// srv := newFakeLokiServer(requests)
// defer srv.Close()
//
// // Utility functions for this test
// writeLog := func(timestamp time.Time, logLine string) {
// _, err := fmt.Fprintf(rb, "%s [svc1] %s\n",
// timestamp.UTC().Format("2006-01-02T15:04:05.000Z07:00"), logLine)
// c.Assert(err, IsNil)
// }
// expectLogs := func(expected string) {
// select {
// case req := <-requests:
// c.Assert(req, Equals, expected)
// case <-time.After(1 * time.Second):
// c.Fatalf("timed out waiting for request %q", expected)
// }
// }
//
// m.PlanChanged(&plan.Plan{
// Services: map[string]*plan.Service{
// "svc1": {},
// },
// LogTargets: map[string]*plan.LogTarget{
// "tgt1": {
// Type: plan.LokiTarget,
// Location: srv.URL(),
// Selection: plan.UnsetSelection,
// },
// },
// })
// m.ServiceStarted("svc1", rb)
// c.Assert(m.forwarders, HasLen, 1)
//
// // Write logs
// writeLog(time.Date(2023, 1, 31, 1, 23, 45, 67890, time.UTC), "log line #1")
// writeLog(time.Date(2023, 1, 31, 1, 23, 46, 67890, time.UTC), "log line #2")
// expectLogs(`{"streams":[{"stream":{"pebble_service":"svc1"},"values":[["1675128225000000000","log line #1"],["1675128226000000000","log line #2"]]}]}`)
//
// // Call PlanChanged again
// m.PlanChanged(&plan.Plan{
// Services: map[string]*plan.Service{
// "svc1": {},
// },
// LogTargets: map[string]*plan.LogTarget{
// "tgt1": {
// Type: plan.LokiTarget,
// Location: srv.URL(),
// Selection: plan.UnsetSelection,
// },
// },
// })
// c.Check(m.forwarders, HasLen, 1)
//
// // Write logs
// writeLog(time.Date(2023, 1, 31, 1, 23, 47, 67890, time.UTC), "log line #3")
// writeLog(time.Date(2023, 1, 31, 1, 23, 48, 67890, time.UTC), "log line #4")
// expectLogs(`{"streams":[{"stream":{"pebble_service":"svc1"},"values":[["1675128227000000000","log line #3"],["1675128228000000000","log line #4"]]}]}`)
//}

func newLogManagerForTest() *LogManager {
return NewLogManager()
//return &LogManager{
// forwarders: map[string]*logForwarder{},
// gatherers: map[string]*logGatherer{},
// newForwarder: newLogForwarderForTest,
// newGatherer: newLogGathererForTest,
//}
}

0 comments on commit b3ed54c

Please sign in to comment.