Skip to content

Commit

Permalink
[WIP] Log forwarding implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
barrettj12 committed Jul 3, 2023
1 parent 30f520b commit e02e518
Show file tree
Hide file tree
Showing 7 changed files with 644 additions and 6 deletions.
55 changes: 55 additions & 0 deletions internals/overlord/logstate/fake.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright (c) 2023 Canonical Ltd
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License version 3 as
// published by the Free Software Foundation.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package logstate

import (
"bytes"
"fmt"
"io"

"github.com/canonical/pebble/internals/servicelog"
)

type fakeLogBuffer struct {
buf bytes.Buffer
}

func (f *fakeLogBuffer) Write(entry servicelog.Entry) {
f.buf.WriteString(entry.Time.String())
f.buf.WriteString(" ")
f.buf.WriteString(entry.Service)
f.buf.WriteString(" ")
f.buf.WriteString(entry.Message)
f.buf.WriteString("\n")
}

func (f *fakeLogBuffer) IsFull() bool {
return f.buf.Len() > 300
}

func (f *fakeLogBuffer) Request() io.Reader {
return &f.buf
}

func (f *fakeLogBuffer) Reset() {
f.buf.Reset()
}

type fakeLogClient struct{}

func (f *fakeLogClient) Send(body io.Reader) error {
data, _ := io.ReadAll(body)
fmt.Printf("%s", data)
return nil
}
73 changes: 73 additions & 0 deletions internals/overlord/logstate/forwarder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright (c) 2023 Canonical Ltd
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License version 3 as
// published by the Free Software Foundation.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package logstate

import (
"sync"

"github.com/canonical/pebble/internals/logger"
"github.com/canonical/pebble/internals/servicelog"
)

type logForwarder struct {
serviceName string

mu sync.Mutex // mutex for gatherers
gatherers []*logGatherer

cancel chan struct{}
}

func newLogForwarder(serviceName string) *logForwarder {
f := &logForwarder{
serviceName: serviceName,
cancel: make(chan struct{}),
// TODO
}

return f
}

func (f *logForwarder) forward(buffer *servicelog.RingBuffer) {
iterator := buffer.HeadIterator(0)
// TODO: don't use the parser, just pull/write bytes from iterator
parser := servicelog.NewParser(iterator, 1024 /* TODO*/)

for iterator.Next(f.cancel) {
for parser.Next() {
entry := parser.Entry()
f.mu.Lock()
gatherers := f.gatherers
f.mu.Unlock()
for _, c := range gatherers {
c.addLog(entry)
//if err != nil {
// logger.Noticef("Cannot write log entry for service %q to %q: %v",
// f.service, c.Target().Name, err)}
}
}
if err := parser.Err(); err != nil {
logger.Noticef(
"Cannot read logs from service %q: %v",
f.serviceName, err)
}
}
}

func (f *logForwarder) stop() {
close(f.cancel)
}

// TODO
// should we have a forwarder for services w/ no log targets
145 changes: 145 additions & 0 deletions internals/overlord/logstate/gatherer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Copyright (c) 2023 Canonical Ltd
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License version 3 as
// published by the Free Software Foundation.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package logstate

import (
"io"
"sync"
"time"

"github.com/canonical/pebble/internals/logger"

"github.com/canonical/pebble/internals/plan"
"github.com/canonical/pebble/internals/servicelog"
)

// logGatherer is responsible for collecting service logs from a forwarder,
// writing them to its internal logBuffer, and sending the request via its
// logClient.
// One logGatherer will run per log target. Its loop() method should be run
// in its own goroutine, while the addLog() method can be invoked in a
// separate goroutine by a logForwarder.
// The logGatherer will "flush" and send a request to the client when:
// - on a regular cadence (e.g. every 1 second)
// - when the buffer reaches a certain size
// - when it is told to shut down.
type logGatherer struct {
target *plan.LogTarget

tickPeriod time.Duration
ticker *time.Ticker

bufferLock sync.Mutex
buffer logBuffer
client logClient

writeCh chan struct{}
cancel chan struct{}
}

func newLogGatherer(target *plan.LogTarget) *logGatherer {
tickPeriod := 1 * time.Second

return &logGatherer{
target: target,
tickPeriod: tickPeriod,
ticker: time.NewTicker(tickPeriod),
buffer: newLogBuffer(target),
client: newLogClient(target),
writeCh: make(chan struct{}),
cancel: make(chan struct{}),
}
}

func (g *logGatherer) loop() {
defer g.ticker.Stop()

for {
select {
case <-g.ticker.C:
//fmt.Println("timeout")
g.flush(true)
case <-g.writeCh: // is the buffer full?
//fmt.Println("got a write")
g.flush(false)
case <-g.cancel:
//fmt.Println("cancelled")
g.flush(true)
return
}
}
}

func (g *logGatherer) addLog(entry servicelog.Entry) {
g.bufferLock.Lock()
defer g.bufferLock.Unlock()
g.buffer.Write(entry)

// Try to notify the control loop of a new write to the buffer.
// We don't want this method to block, so if the control loop is not ready
// to receive, then drop the notification.
select {
case g.writeCh <- struct{}{}:
default:
}
}

// flush obtains a lock on the buffer, prepares the request, sends to the
// remote server, and empties the buffer.
// If force is false, flush will check first if the buffer is full, and only
// flush if it is full.
func (g *logGatherer) flush(force bool) {
g.bufferLock.Lock()
defer g.bufferLock.Unlock()

if !force {
if !g.buffer.IsFull() {
return
}
}

err := g.client.Send(g.buffer.Request())
if err != nil {
logger.Noticef("couldn't send logs to target %q: %v", g.target.Name, err)
}

g.buffer.Reset()
}

// stop closes the cancel channel, thereby terminating the main loop.
func (g *logGatherer) stop() {
close(g.cancel)
}

// logBuffer is an interface which ...
// A logBuffer's methods may not be concurrency-safe. Callers should protect
// the logBuffer using a sync.Mutex.
type logBuffer interface {
Write(servicelog.Entry)
IsFull() bool
Request() io.Reader
Reset()
}

func newLogBuffer(target *plan.LogTarget) logBuffer {
return &fakeLogBuffer{}
}

type logClient interface {
Send(io.Reader) error
}

func newLogClient(target *plan.LogTarget) logClient {
return &fakeLogClient{}
}
48 changes: 48 additions & 0 deletions internals/overlord/logstate/gatherer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright (c) 2023 Canonical Ltd
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License version 3 as
// published by the Free Software Foundation.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package logstate

import (
"time"

"github.com/canonical/pebble/internals/servicelog"
. "gopkg.in/check.v1"
)

type gathererSuite struct{}

var _ = Suite(&gathererSuite{})

func (s *gathererSuite) TestGathererTimeout(c *C) {
g := newLogGathererForTest(1 * time.Second)
go g.loop()

g.addLog(servicelog.Entry{
Time: time.Date(2023, 1, 1, 14, 34, 56, 789, nil),
Service: "foobar",
Message: "this is a log",
})
time.Sleep(1 * time.Second)
c.Assert(g.buffer.IsFull(), Equals, true)
}

func newLogGathererForTest(tickPeriod time.Duration) *logGatherer {
return &logGatherer{
tickPeriod: tickPeriod,
buffer: &fakeLogBuffer{},
client: &fakeLogClient{},
writeCh: make(chan struct{}),
cancel: make(chan struct{}),
}
}
Loading

0 comments on commit e02e518

Please sign in to comment.