Skip to content

Commit

Permalink
feat(dockerlog): add package to query logs from Docker daemon
Browse files Browse the repository at this point in the history
  • Loading branch information
tdakkota committed Aug 18, 2024
1 parent bc4577c commit 74227df
Show file tree
Hide file tree
Showing 6 changed files with 559 additions and 0 deletions.
Binary file added internal/dockerlog/_testdata/dockerlog.bin
Binary file not shown.
120 changes: 120 additions & 0 deletions internal/dockerlog/daemonlog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package dockerlog

import (
"bytes"
"encoding/binary"
"io"
"strings"
"time"

"github.com/go-faster/errors"

"github.com/go-faster/oteldb/internal/logql/logqlengine"
"github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels"
"github.com/go-faster/oteldb/internal/otelstorage"
)

// ParseLog parses log stream from Docker daemon.
func ParseLog(f io.ReadCloser, resource otelstorage.Attrs) logqlengine.EntryIterator {
return &streamIter{
rd: f,
err: nil,
resource: resource,
}
}

const headerLen = 8

type streamIter struct {
rd io.ReadCloser
header [headerLen]byte
buf bytes.Buffer
err error

resource otelstorage.Attrs
}

var _ logqlengine.EntryIterator = (*streamIter)(nil)

// Next returns true, if there is element and fills t.
func (i *streamIter) Next(r *logqlengine.Entry) (ok bool) {
// Reset entry.
*r = logqlengine.Entry{
Set: logqlabels.NewLabelSet(),
}

ok, i.err = i.parseNext(r)
return ok
}

type stdType byte

const (
// Stdin represents standard input stream type.
stdin stdType = iota
// Stdout represents standard output stream type.
stdout
// Stderr represents standard error steam type.
stderr
// Systemerr represents errors originating from the system that make it
// into the multiplexed stream.
systemerr
)

func (i *streamIter) parseNext(r *logqlengine.Entry) (bool, error) {
if _, err := io.ReadFull(i.rd, i.header[:]); err != nil {
switch err {
case io.EOF, io.ErrUnexpectedEOF:
// Handle missing header gracefully, docker-cli does the same thing.
return false, nil
default:
return false, errors.Wrap(err, "read header")
}
}

var (
typ = stdType(i.header[0])
frameSize = binary.BigEndian.Uint32(i.header[4:8])
)
i.buf.Reset()
if _, err := io.CopyN(&i.buf, i.rd, int64(frameSize)); err != nil {
return false, errors.Wrap(err, "read message")
}
if typ == systemerr {
return false, errors.Errorf("daemon log stream error: %q", &i.buf)
}

if err := parseDockerLine(typ, i.buf.String(), r); err != nil {
return false, errors.Wrap(err, "parse log line")
}
r.Set.SetAttrs(i.resource)

return true, nil
}

func parseDockerLine(_ stdType, input string, r *logqlengine.Entry) error {
const dockerTimestampFormat = time.RFC3339Nano

rawTimestamp, line, ok := strings.Cut(input, " ")
if !ok {
return errors.New("invalid line: no space between timestamp and message")
}
r.Line = line

ts, err := time.Parse(dockerTimestampFormat, rawTimestamp)
if err != nil {
return errors.Wrap(err, "parse timestamp")
}
r.Timestamp = otelstorage.NewTimestampFromTime(ts)
return nil
}

// Err returns an error caused during iteration, if any.
func (i *streamIter) Err() error {
return i.err
}

// Close closes iterator.
func (i *streamIter) Close() error {
return i.rd.Close()
}
40 changes: 40 additions & 0 deletions internal/dockerlog/daemonlog_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package dockerlog

import (
"os"
"testing"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"

"github.com/go-faster/oteldb/internal/logql/logqlengine"
"github.com/go-faster/oteldb/internal/otelstorage"
)

func TestParseLog(t *testing.T) {
f, err := os.Open("_testdata/dockerlog.bin")
require.NoError(t, err)
defer f.Close()

iter := ParseLog(f, otelstorage.Attrs(pcommon.NewMap()))
defer iter.Close()

expected := []logqlengine.Entry{
{Timestamp: 1707644252033031260, Line: "time=\"2024-02-11T09:37:32.032946602Z\" level=warning msg=\"No HTTP secret provided - generated random secret. This may cause problems with uploads if multiple registries are behind a load-balancer. To provide a shared secret, fill in http.secret in the configuration file or set the REGISTRY_HTTP_SECRET environment variable.\" go.version=go1.20.8 instance.id=3482d08d-d782-4c47-b0e0-37af45c9b495 service=registry version=2.8.3 \n"},
{Timestamp: 1707644252033058840, Line: "time=\"2024-02-11T09:37:32.032982092Z\" level=info msg=\"redis not configured\" go.version=go1.20.8 instance.id=3482d08d-d782-4c47-b0e0-37af45c9b495 service=registry version=2.8.3 \n"},
{Timestamp: 1707644252033079609, Line: "time=\"2024-02-11T09:37:32.03304416Z\" level=info msg=\"using inmemory blob descriptor cache\" go.version=go1.20.8 instance.id=3482d08d-d782-4c47-b0e0-37af45c9b495 service=registry version=2.8.3 \n"},
{Timestamp: 1707644252033097289, Line: "time=\"2024-02-11T09:37:32.03303576Z\" level=info msg=\"Starting upload purge in 4m0s\" go.version=go1.20.8 instance.id=3482d08d-d782-4c47-b0e0-37af45c9b495 service=registry version=2.8.3 \n"},
{Timestamp: 1707644252033198626, Line: "time=\"2024-02-11T09:37:32.033175887Z\" level=info msg=\"listening on [::]:5000\" go.version=go1.20.8 instance.id=3482d08d-d782-4c47-b0e0-37af45c9b495 service=registry version=2.8.3 \n"},
}

var (
r logqlengine.Entry
i int
)
for iter.Next(&r) {
require.Equal(t, expected[i].Timestamp, r.Timestamp)
require.Equal(t, expected[i].Line, r.Line)
i++
}
require.NoError(t, iter.Err())
}
228 changes: 228 additions & 0 deletions internal/dockerlog/dockerlog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
// Package dockerlog provides Docker container log parser.
package dockerlog

import (
"context"
"strconv"
"strings"
"time"

"github.com/docker/docker/api/types"
apicontainer "github.com/docker/docker/api/types/container"
"github.com/docker/docker/client"
"github.com/go-faster/errors"
"go.opentelemetry.io/collector/pdata/pcommon"
"golang.org/x/sync/errgroup"

"github.com/go-faster/oteldb/internal/iterators"
"github.com/go-faster/oteldb/internal/logql"
"github.com/go-faster/oteldb/internal/logql/logqlengine"
"github.com/go-faster/oteldb/internal/otelstorage"
)

// Querier implements LogQL querier.
type Querier struct {
client client.APIClient
}

// NewQuerier creates new Querier.
func NewQuerier(c client.APIClient) (*Querier, error) {
return &Querier{
client: c,
}, nil
}

// Capabilities returns Querier capabilities.
// NOTE: engine would call once and then save value.
//
// Capabilities should not change over time.
func (q *Querier) Capabilities() (caps logqlengine.QuerierCapabilities) {
caps.Label.Add(logql.OpEq, logql.OpNotEq, logql.OpRe, logql.OpNotRe)
return caps
}

var _ logqlengine.Querier = (*Querier)(nil)

// Query creates new [InputNode].
func (q *Querier) Query(ctx context.Context, labels []logql.LabelMatcher) (logqlengine.PipelineNode, error) {
return &InputNode{
Lables: labels,
q: q,
}, nil
}

// InputNode is an input for LogQL pipeline using Docker API.
type InputNode struct {
Lables []logql.LabelMatcher

q *Querier
}

var _ logqlengine.PipelineNode = (*InputNode)(nil)

// Traverse implements [logqlengine.Node].
func (n *InputNode) Traverse(cb logqlengine.NodeVisitor) error {
return cb(n)
}

// EvalPipeline implements [logqlengine.PipelineNode].
func (n *InputNode) EvalPipeline(ctx context.Context, params logqlengine.EvalParams) (_ logqlengine.EntryIterator, rerr error) {
containers, err := n.q.fetchContainers(ctx, n.Lables)
if err != nil {
return nil, errors.Wrap(err, "fetch containers")
}
switch len(containers) {
case 0:
return iterators.Empty[logqlengine.Entry](), nil
case 1:
return n.q.openLog(ctx, containers[0], params.Start, params.End)
default:
iters := make([]logqlengine.EntryIterator, len(containers))
defer func() {
// Close all iterators in case of error.
if rerr != nil {
for _, iter := range iters {
if iter == nil {
continue
}
_ = iter.Close()
}
}
}()

// FIXME(tdakkota): errgroup cancels group context
// when Wait is done.
//
// It cancels request to Docker daemon, so we use query context to avoid this.
// As a result, openLog context would not be canceled in case of error.
var grp errgroup.Group
for idx, ctr := range containers {
idx, ctr := idx, ctr
grp.Go(func() error {
iter, err := n.q.openLog(ctx, ctr, params.Start, params.End)
if err != nil {
return errors.Wrapf(err, "open container %q log", ctr.ID)
}
iters[idx] = iter
return nil
})
}
if err := grp.Wait(); err != nil {
return nil, err
}
return newMergeIter(iters), nil
}
}

func (q *Querier) openLog(ctx context.Context, ctr container, start, end time.Time) (logqlengine.EntryIterator, error) {
var since, until string
if t := start; !t.IsZero() {
since = strconv.FormatInt(t.Unix(), 10)
}
if t := end; !t.IsZero() {
until = strconv.FormatInt(t.Unix(), 10)
}

rc, err := q.client.ContainerLogs(ctx, ctr.ID, apicontainer.LogsOptions{
ShowStdout: true,
ShowStderr: true,
Since: since,
Until: until,
Timestamps: true,
Tail: "all",
})
if err != nil {
return nil, errors.Wrap(err, "query logs")
}
return ParseLog(rc, ctr.labels.AsResource()), nil
}

func (q *Querier) fetchContainers(ctx context.Context, labels []logql.LabelMatcher) (r []container, _ error) {
containers, err := q.client.ContainerList(ctx, apicontainer.ListOptions{
All: true,
// TODO(tdakkota): convert select params to label matchers.
})
if err != nil {
return nil, errors.Wrap(err, "query container list")
}

for _, ctr := range containers {
set := getLabels(ctr)
if set.Match(labels) {
r = append(r, container{
ID: ctr.ID,
labels: set,
})
}
}
return r, nil
}

type container struct {
ID string
labels containerLabels
}

type containerLabels struct {
labels map[string]string
}

func getLabels(ctr types.Container) containerLabels {
var name string
if len(ctr.Names) > 0 {
name = strings.TrimPrefix(ctr.Names[0], "/")
}
labels := map[string]string{
"container": name,
"container_id": ctr.ID,
"container_name": name,
"container_image": ctr.Image,
"container_image_id": ctr.ImageID,
"container_command": ctr.Command,
"container_created": strconv.FormatInt(ctr.Created, 10),
"container_state": ctr.State,
"container_status": ctr.Status,
}
for label, value := range ctr.Labels {
labels[otelstorage.KeyToLabel(label)] = value
}
return containerLabels{
labels: labels,
}
}

func (c containerLabels) Match(matchers []logql.LabelMatcher) bool {
for _, matcher := range matchers {
value, ok := c.labels[string(matcher.Label)]
if !ok {
return false
}
if !match(matcher, value) {
return false
}
}
return true
}

func (c containerLabels) AsResource() otelstorage.Attrs {
attrs := otelstorage.Attrs(pcommon.NewMap())
for key, value := range c.labels {
attrs.AsMap().PutStr(key, value)
}
return attrs
}

func match(m logql.LabelMatcher, s string) bool {
switch m.Op {
case logql.OpEq:
return s == m.Value
case logql.OpNotEq:
return s == m.Value
case logql.OpRe:
return m.Re.MatchString(s)
case logql.OpNotRe:
return !m.Re.MatchString(s)
default:
return false
}
}
Loading

0 comments on commit 74227df

Please sign in to comment.