diff --git a/internal/dockerlog/_testdata/dockerlog.bin b/internal/dockerlog/_testdata/dockerlog.bin new file mode 100644 index 00000000..b1b81e19 Binary files /dev/null and b/internal/dockerlog/_testdata/dockerlog.bin differ diff --git a/internal/dockerlog/daemonlog.go b/internal/dockerlog/daemonlog.go new file mode 100644 index 00000000..fde83915 --- /dev/null +++ b/internal/dockerlog/daemonlog.go @@ -0,0 +1,120 @@ +package dockerlog + +import ( + "bytes" + "encoding/binary" + "io" + "strings" + "time" + + "github.com/go-faster/errors" + + "github.com/go-faster/oteldb/internal/logql/logqlengine" + "github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels" + "github.com/go-faster/oteldb/internal/otelstorage" +) + +// ParseLog parses log stream from Docker daemon. +func ParseLog(f io.ReadCloser, resource otelstorage.Attrs) logqlengine.EntryIterator { + return &streamIter{ + rd: f, + err: nil, + resource: resource, + } +} + +const headerLen = 8 + +type streamIter struct { + rd io.ReadCloser + header [headerLen]byte + buf bytes.Buffer + err error + + resource otelstorage.Attrs +} + +var _ logqlengine.EntryIterator = (*streamIter)(nil) + +// Next returns true, if there is element and fills t. +func (i *streamIter) Next(r *logqlengine.Entry) (ok bool) { + // Reset entry. + *r = logqlengine.Entry{ + Set: logqlabels.NewLabelSet(), + } + + ok, i.err = i.parseNext(r) + return ok +} + +type stdType byte + +const ( + // Stdin represents standard input stream type. + stdin stdType = iota + // Stdout represents standard output stream type. + stdout + // Stderr represents standard error steam type. + stderr + // Systemerr represents errors originating from the system that make it + // into the multiplexed stream. + systemerr +) + +func (i *streamIter) parseNext(r *logqlengine.Entry) (bool, error) { + if _, err := io.ReadFull(i.rd, i.header[:]); err != nil { + switch err { + case io.EOF, io.ErrUnexpectedEOF: + // Handle missing header gracefully, docker-cli does the same thing. + return false, nil + default: + return false, errors.Wrap(err, "read header") + } + } + + var ( + typ = stdType(i.header[0]) + frameSize = binary.BigEndian.Uint32(i.header[4:8]) + ) + i.buf.Reset() + if _, err := io.CopyN(&i.buf, i.rd, int64(frameSize)); err != nil { + return false, errors.Wrap(err, "read message") + } + if typ == systemerr { + return false, errors.Errorf("daemon log stream error: %q", &i.buf) + } + + if err := parseDockerLine(typ, i.buf.String(), r); err != nil { + return false, errors.Wrap(err, "parse log line") + } + r.Set.SetAttrs(i.resource) + + return true, nil +} + +func parseDockerLine(_ stdType, input string, r *logqlengine.Entry) error { + const dockerTimestampFormat = time.RFC3339Nano + + rawTimestamp, line, ok := strings.Cut(input, " ") + if !ok { + return errors.New("invalid line: no space between timestamp and message") + } + r.Line = line + + ts, err := time.Parse(dockerTimestampFormat, rawTimestamp) + if err != nil { + return errors.Wrap(err, "parse timestamp") + } + r.Timestamp = otelstorage.NewTimestampFromTime(ts) + return nil +} + +// Err returns an error caused during iteration, if any. +func (i *streamIter) Err() error { + return i.err +} + +// Close closes iterator. +func (i *streamIter) Close() error { + return i.rd.Close() +} diff --git a/internal/dockerlog/daemonlog_test.go b/internal/dockerlog/daemonlog_test.go new file mode 100644 index 00000000..9e7dca3d --- /dev/null +++ b/internal/dockerlog/daemonlog_test.go @@ -0,0 +1,40 @@ +package dockerlog + +import ( + "os" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + + "github.com/go-faster/oteldb/internal/logql/logqlengine" + "github.com/go-faster/oteldb/internal/otelstorage" +) + +func TestParseLog(t *testing.T) { + f, err := os.Open("_testdata/dockerlog.bin") + require.NoError(t, err) + defer f.Close() + + iter := ParseLog(f, otelstorage.Attrs(pcommon.NewMap())) + defer iter.Close() + + expected := []logqlengine.Entry{ + {Timestamp: 1707644252033031260, Line: "time=\"2024-02-11T09:37:32.032946602Z\" level=warning msg=\"No HTTP secret provided - generated random secret. This may cause problems with uploads if multiple registries are behind a load-balancer. To provide a shared secret, fill in http.secret in the configuration file or set the REGISTRY_HTTP_SECRET environment variable.\" go.version=go1.20.8 instance.id=3482d08d-d782-4c47-b0e0-37af45c9b495 service=registry version=2.8.3 \n"}, + {Timestamp: 1707644252033058840, Line: "time=\"2024-02-11T09:37:32.032982092Z\" level=info msg=\"redis not configured\" go.version=go1.20.8 instance.id=3482d08d-d782-4c47-b0e0-37af45c9b495 service=registry version=2.8.3 \n"}, + {Timestamp: 1707644252033079609, Line: "time=\"2024-02-11T09:37:32.03304416Z\" level=info msg=\"using inmemory blob descriptor cache\" go.version=go1.20.8 instance.id=3482d08d-d782-4c47-b0e0-37af45c9b495 service=registry version=2.8.3 \n"}, + {Timestamp: 1707644252033097289, Line: "time=\"2024-02-11T09:37:32.03303576Z\" level=info msg=\"Starting upload purge in 4m0s\" go.version=go1.20.8 instance.id=3482d08d-d782-4c47-b0e0-37af45c9b495 service=registry version=2.8.3 \n"}, + {Timestamp: 1707644252033198626, Line: "time=\"2024-02-11T09:37:32.033175887Z\" level=info msg=\"listening on [::]:5000\" go.version=go1.20.8 instance.id=3482d08d-d782-4c47-b0e0-37af45c9b495 service=registry version=2.8.3 \n"}, + } + + var ( + r logqlengine.Entry + i int + ) + for iter.Next(&r) { + require.Equal(t, expected[i].Timestamp, r.Timestamp) + require.Equal(t, expected[i].Line, r.Line) + i++ + } + require.NoError(t, iter.Err()) +} diff --git a/internal/dockerlog/dockerlog.go b/internal/dockerlog/dockerlog.go new file mode 100644 index 00000000..c16e2b94 --- /dev/null +++ b/internal/dockerlog/dockerlog.go @@ -0,0 +1,228 @@ +// Package dockerlog provides Docker container log parser. +package dockerlog + +import ( + "context" + "strconv" + "strings" + "time" + + "github.com/docker/docker/api/types" + apicontainer "github.com/docker/docker/api/types/container" + "github.com/docker/docker/client" + "github.com/go-faster/errors" + "go.opentelemetry.io/collector/pdata/pcommon" + "golang.org/x/sync/errgroup" + + "github.com/go-faster/oteldb/internal/iterators" + "github.com/go-faster/oteldb/internal/logql" + "github.com/go-faster/oteldb/internal/logql/logqlengine" + "github.com/go-faster/oteldb/internal/otelstorage" +) + +// Querier implements LogQL querier. +type Querier struct { + client client.APIClient +} + +// NewQuerier creates new Querier. +func NewQuerier(c client.APIClient) (*Querier, error) { + return &Querier{ + client: c, + }, nil +} + +// Capabilities returns Querier capabilities. +// NOTE: engine would call once and then save value. +// +// Capabilities should not change over time. +func (q *Querier) Capabilities() (caps logqlengine.QuerierCapabilities) { + caps.Label.Add(logql.OpEq, logql.OpNotEq, logql.OpRe, logql.OpNotRe) + return caps +} + +var _ logqlengine.Querier = (*Querier)(nil) + +// Query creates new [InputNode]. +func (q *Querier) Query(ctx context.Context, labels []logql.LabelMatcher) (logqlengine.PipelineNode, error) { + return &InputNode{ + Lables: labels, + q: q, + }, nil +} + +// InputNode is an input for LogQL pipeline using Docker API. +type InputNode struct { + Lables []logql.LabelMatcher + + q *Querier +} + +var _ logqlengine.PipelineNode = (*InputNode)(nil) + +// Traverse implements [logqlengine.Node]. +func (n *InputNode) Traverse(cb logqlengine.NodeVisitor) error { + return cb(n) +} + +// EvalPipeline implements [logqlengine.PipelineNode]. +func (n *InputNode) EvalPipeline(ctx context.Context, params logqlengine.EvalParams) (_ logqlengine.EntryIterator, rerr error) { + containers, err := n.q.fetchContainers(ctx, n.Lables) + if err != nil { + return nil, errors.Wrap(err, "fetch containers") + } + switch len(containers) { + case 0: + return iterators.Empty[logqlengine.Entry](), nil + case 1: + return n.q.openLog(ctx, containers[0], params.Start, params.End) + default: + iters := make([]logqlengine.EntryIterator, len(containers)) + defer func() { + // Close all iterators in case of error. + if rerr != nil { + for _, iter := range iters { + if iter == nil { + continue + } + _ = iter.Close() + } + } + }() + + // FIXME(tdakkota): errgroup cancels group context + // when Wait is done. + // + // It cancels request to Docker daemon, so we use query context to avoid this. + // As a result, openLog context would not be canceled in case of error. + var grp errgroup.Group + for idx, ctr := range containers { + idx, ctr := idx, ctr + grp.Go(func() error { + iter, err := n.q.openLog(ctx, ctr, params.Start, params.End) + if err != nil { + return errors.Wrapf(err, "open container %q log", ctr.ID) + } + iters[idx] = iter + return nil + }) + } + if err := grp.Wait(); err != nil { + return nil, err + } + return newMergeIter(iters), nil + } +} + +func (q *Querier) openLog(ctx context.Context, ctr container, start, end time.Time) (logqlengine.EntryIterator, error) { + var since, until string + if t := start; !t.IsZero() { + since = strconv.FormatInt(t.Unix(), 10) + } + if t := end; !t.IsZero() { + until = strconv.FormatInt(t.Unix(), 10) + } + + rc, err := q.client.ContainerLogs(ctx, ctr.ID, apicontainer.LogsOptions{ + ShowStdout: true, + ShowStderr: true, + Since: since, + Until: until, + Timestamps: true, + Tail: "all", + }) + if err != nil { + return nil, errors.Wrap(err, "query logs") + } + return ParseLog(rc, ctr.labels.AsResource()), nil +} + +func (q *Querier) fetchContainers(ctx context.Context, labels []logql.LabelMatcher) (r []container, _ error) { + containers, err := q.client.ContainerList(ctx, apicontainer.ListOptions{ + All: true, + // TODO(tdakkota): convert select params to label matchers. + }) + if err != nil { + return nil, errors.Wrap(err, "query container list") + } + + for _, ctr := range containers { + set := getLabels(ctr) + if set.Match(labels) { + r = append(r, container{ + ID: ctr.ID, + labels: set, + }) + } + } + return r, nil +} + +type container struct { + ID string + labels containerLabels +} + +type containerLabels struct { + labels map[string]string +} + +func getLabels(ctr types.Container) containerLabels { + var name string + if len(ctr.Names) > 0 { + name = strings.TrimPrefix(ctr.Names[0], "/") + } + labels := map[string]string{ + "container": name, + "container_id": ctr.ID, + "container_name": name, + "container_image": ctr.Image, + "container_image_id": ctr.ImageID, + "container_command": ctr.Command, + "container_created": strconv.FormatInt(ctr.Created, 10), + "container_state": ctr.State, + "container_status": ctr.Status, + } + for label, value := range ctr.Labels { + labels[otelstorage.KeyToLabel(label)] = value + } + return containerLabels{ + labels: labels, + } +} + +func (c containerLabels) Match(matchers []logql.LabelMatcher) bool { + for _, matcher := range matchers { + value, ok := c.labels[string(matcher.Label)] + if !ok { + return false + } + if !match(matcher, value) { + return false + } + } + return true +} + +func (c containerLabels) AsResource() otelstorage.Attrs { + attrs := otelstorage.Attrs(pcommon.NewMap()) + for key, value := range c.labels { + attrs.AsMap().PutStr(key, value) + } + return attrs +} + +func match(m logql.LabelMatcher, s string) bool { + switch m.Op { + case logql.OpEq: + return s == m.Value + case logql.OpNotEq: + return s == m.Value + case logql.OpRe: + return m.Re.MatchString(s) + case logql.OpNotRe: + return !m.Re.MatchString(s) + default: + return false + } +} diff --git a/internal/dockerlog/merge_iter.go b/internal/dockerlog/merge_iter.go new file mode 100644 index 00000000..54eb7dc1 --- /dev/null +++ b/internal/dockerlog/merge_iter.go @@ -0,0 +1,120 @@ +package dockerlog + +import ( + "container/heap" + + "go.uber.org/multierr" + + "github.com/go-faster/oteldb/internal/logql/logqlengine" +) + +type iterHeapElem struct { + iterIdx int + entry logqlengine.Entry +} + +func (a iterHeapElem) Less(b iterHeapElem) bool { + return a.entry.Timestamp < b.entry.Timestamp +} + +type iterHeap []iterHeapElem + +func (h iterHeap) Len() int { + return len(h) +} + +func (h iterHeap) Less(i, j int) bool { + return h[i].Less(h[j]) +} + +func (h iterHeap) Swap(i, j int) { + h[i], h[j] = h[j], h[i] +} + +func (h *iterHeap) Push(x any) { + *h = append(*h, x.(iterHeapElem)) +} + +func (h *iterHeap) Pop() any { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} + +// mergeIter merges several iterators by timestamp. +type mergeIter struct { + iters []logqlengine.EntryIterator + heap iterHeap + initialized bool +} + +func newMergeIter(iters []logqlengine.EntryIterator) logqlengine.EntryIterator { + return &mergeIter{ + iters: iters, + } +} + +var _ logqlengine.EntryIterator = (*mergeIter)(nil) + +// Next returns true, if there is element and fills t. +func (i *mergeIter) Next(r *logqlengine.Entry) (ok bool) { + i.init() + if i.heap.Len() < 1 { + return false + } + + // Get min element from heap (record with smallest timestamp). + e := heap.Pop(&i.heap).(iterHeapElem) + *r = e.entry + + switch iter := i.iters[e.iterIdx]; { + case iter.Next(&e.entry): + // Peek next element from min iterator. + heap.Push(&i.heap, e) + return true + case iter.Err() != nil: + // Return an error, if read failed. + return false + default: + // heap.Pop removed drained iterator from heap. + return true + } +} + +func (i *mergeIter) init() { + if i.initialized { + return + } + i.initialized = true + + // Peek an element from each iterator to + // find min element. + var entry logqlengine.Entry + for idx, iter := range i.iters { + if !iter.Next(&entry) { + continue + } + heap.Push(&i.heap, iterHeapElem{ + iterIdx: idx, + entry: entry, + }) + } +} + +// Err returns an error caused during iteration, if any. +func (i *mergeIter) Err() (rerr error) { + for _, iter := range i.iters { + multierr.AppendInto(&rerr, iter.Err()) + } + return rerr +} + +// Close closes iterator. +func (i *mergeIter) Close() (rerr error) { + for _, iter := range i.iters { + multierr.AppendInto(&rerr, iter.Close()) + } + return rerr +} diff --git a/internal/dockerlog/merge_iter_test.go b/internal/dockerlog/merge_iter_test.go new file mode 100644 index 00000000..ea753159 --- /dev/null +++ b/internal/dockerlog/merge_iter_test.go @@ -0,0 +1,51 @@ +package dockerlog + +import ( + "fmt" + "slices" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/go-faster/oteldb/internal/iterators" + "github.com/go-faster/oteldb/internal/logql/logqlengine" + "github.com/go-faster/oteldb/internal/otelstorage" +) + +func TestMergeIter(t *testing.T) { + seriess := [][]otelstorage.Timestamp{ + {1, 5, 6}, + {2, 3, 7}, + {4, 8}, + } + + var ( + iters = make([]logqlengine.EntryIterator, len(seriess)) + expected []otelstorage.Timestamp + ) + // Build iterators from given timestamp series. + for i, series := range seriess { + elems := make([]logqlengine.Entry, len(series)) + for i, ts := range series { + elems[i] = logqlengine.Entry{ + Timestamp: ts, + Line: fmt.Sprintf("Message #%d", i), + } + expected = append(expected, ts) + } + iters[i] = iterators.Slice(elems) + } + // Expect a sorted list of timestamps. + slices.Sort(expected) + + var ( + iter = newMergeIter(iters) + record logqlengine.Entry + got []otelstorage.Timestamp + ) + for iter.Next(&record) { + got = append(got, record.Timestamp) + } + require.NoError(t, iter.Err()) + require.Equal(t, expected, got) +}