From 74227df9d06723a9ba6a4088cc5cca9e3dddba1b Mon Sep 17 00:00:00 2001 From: tdakkota Date: Sun, 18 Aug 2024 03:13:40 +0300 Subject: [PATCH] feat(dockerlog): add package to query logs from Docker daemon --- internal/dockerlog/_testdata/dockerlog.bin | Bin 0 -> 1354 bytes internal/dockerlog/daemonlog.go | 120 +++++++++++ internal/dockerlog/daemonlog_test.go | 40 ++++ internal/dockerlog/dockerlog.go | 228 +++++++++++++++++++++ internal/dockerlog/merge_iter.go | 120 +++++++++++ internal/dockerlog/merge_iter_test.go | 51 +++++ 6 files changed, 559 insertions(+) create mode 100644 internal/dockerlog/_testdata/dockerlog.bin create mode 100644 internal/dockerlog/daemonlog.go create mode 100644 internal/dockerlog/daemonlog_test.go create mode 100644 internal/dockerlog/dockerlog.go create mode 100644 internal/dockerlog/merge_iter.go create mode 100644 internal/dockerlog/merge_iter_test.go diff --git a/internal/dockerlog/_testdata/dockerlog.bin b/internal/dockerlog/_testdata/dockerlog.bin new file mode 100644 index 0000000000000000000000000000000000000000..b1b81e19a9ae4e37040c0fd647ed11f2800442b3 GIT binary patch literal 1354 zcmchXzi!(w5XRGtd5VMQN}woNe++0UjJ8Wr*jkdJ2q^I^5uiwcq!joOyA_?f^;Jqa zPO{kXW=7j=a5@$A@2(&?F!H7 zf7+}oKs5=YRgcQS#Ef7bAY4GJ!wHS`VairVAthn%b6Pjo7Ybg+NlrP*dBC%r21%1- zK~0bZamIGZvMK7MSo*|f5)GC|=smoIq+uS$aCdt!^yc?zuVS*y^W>tTIVHuLp+fES zvzz|Pyf;d_*IPk;U9NxUxUwQ8#rusKlO&1K*FJs28tvS#!W-6Wvxi#h8aUd 0 { + name = strings.TrimPrefix(ctr.Names[0], "/") + } + labels := map[string]string{ + "container": name, + "container_id": ctr.ID, + "container_name": name, + "container_image": ctr.Image, + "container_image_id": ctr.ImageID, + "container_command": ctr.Command, + "container_created": strconv.FormatInt(ctr.Created, 10), + "container_state": ctr.State, + "container_status": ctr.Status, + } + for label, value := range ctr.Labels { + labels[otelstorage.KeyToLabel(label)] = value + } + return containerLabels{ + labels: labels, + } +} + +func (c containerLabels) Match(matchers []logql.LabelMatcher) bool { + for _, matcher := range matchers { + value, ok := c.labels[string(matcher.Label)] + if !ok { + return false + } + if !match(matcher, value) { + return false + } + } + return true +} + +func (c containerLabels) AsResource() otelstorage.Attrs { + attrs := otelstorage.Attrs(pcommon.NewMap()) + for key, value := range c.labels { + attrs.AsMap().PutStr(key, value) + } + return attrs +} + +func match(m logql.LabelMatcher, s string) bool { + switch m.Op { + case logql.OpEq: + return s == m.Value + case logql.OpNotEq: + return s == m.Value + case logql.OpRe: + return m.Re.MatchString(s) + case logql.OpNotRe: + return !m.Re.MatchString(s) + default: + return false + } +} diff --git a/internal/dockerlog/merge_iter.go b/internal/dockerlog/merge_iter.go new file mode 100644 index 00000000..54eb7dc1 --- /dev/null +++ b/internal/dockerlog/merge_iter.go @@ -0,0 +1,120 @@ +package dockerlog + +import ( + "container/heap" + + "go.uber.org/multierr" + + "github.com/go-faster/oteldb/internal/logql/logqlengine" +) + +type iterHeapElem struct { + iterIdx int + entry logqlengine.Entry +} + +func (a iterHeapElem) Less(b iterHeapElem) bool { + return a.entry.Timestamp < b.entry.Timestamp +} + +type iterHeap []iterHeapElem + +func (h iterHeap) Len() int { + return len(h) +} + +func (h iterHeap) Less(i, j int) bool { + return h[i].Less(h[j]) +} + +func (h iterHeap) Swap(i, j int) { + h[i], h[j] = h[j], h[i] +} + +func (h *iterHeap) Push(x any) { + *h = append(*h, x.(iterHeapElem)) +} + +func (h *iterHeap) Pop() any { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} + +// mergeIter merges several iterators by timestamp. +type mergeIter struct { + iters []logqlengine.EntryIterator + heap iterHeap + initialized bool +} + +func newMergeIter(iters []logqlengine.EntryIterator) logqlengine.EntryIterator { + return &mergeIter{ + iters: iters, + } +} + +var _ logqlengine.EntryIterator = (*mergeIter)(nil) + +// Next returns true, if there is element and fills t. +func (i *mergeIter) Next(r *logqlengine.Entry) (ok bool) { + i.init() + if i.heap.Len() < 1 { + return false + } + + // Get min element from heap (record with smallest timestamp). + e := heap.Pop(&i.heap).(iterHeapElem) + *r = e.entry + + switch iter := i.iters[e.iterIdx]; { + case iter.Next(&e.entry): + // Peek next element from min iterator. + heap.Push(&i.heap, e) + return true + case iter.Err() != nil: + // Return an error, if read failed. + return false + default: + // heap.Pop removed drained iterator from heap. + return true + } +} + +func (i *mergeIter) init() { + if i.initialized { + return + } + i.initialized = true + + // Peek an element from each iterator to + // find min element. + var entry logqlengine.Entry + for idx, iter := range i.iters { + if !iter.Next(&entry) { + continue + } + heap.Push(&i.heap, iterHeapElem{ + iterIdx: idx, + entry: entry, + }) + } +} + +// Err returns an error caused during iteration, if any. +func (i *mergeIter) Err() (rerr error) { + for _, iter := range i.iters { + multierr.AppendInto(&rerr, iter.Err()) + } + return rerr +} + +// Close closes iterator. +func (i *mergeIter) Close() (rerr error) { + for _, iter := range i.iters { + multierr.AppendInto(&rerr, iter.Close()) + } + return rerr +} diff --git a/internal/dockerlog/merge_iter_test.go b/internal/dockerlog/merge_iter_test.go new file mode 100644 index 00000000..ea753159 --- /dev/null +++ b/internal/dockerlog/merge_iter_test.go @@ -0,0 +1,51 @@ +package dockerlog + +import ( + "fmt" + "slices" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/go-faster/oteldb/internal/iterators" + "github.com/go-faster/oteldb/internal/logql/logqlengine" + "github.com/go-faster/oteldb/internal/otelstorage" +) + +func TestMergeIter(t *testing.T) { + seriess := [][]otelstorage.Timestamp{ + {1, 5, 6}, + {2, 3, 7}, + {4, 8}, + } + + var ( + iters = make([]logqlengine.EntryIterator, len(seriess)) + expected []otelstorage.Timestamp + ) + // Build iterators from given timestamp series. + for i, series := range seriess { + elems := make([]logqlengine.Entry, len(series)) + for i, ts := range series { + elems[i] = logqlengine.Entry{ + Timestamp: ts, + Line: fmt.Sprintf("Message #%d", i), + } + expected = append(expected, ts) + } + iters[i] = iterators.Slice(elems) + } + // Expect a sorted list of timestamps. + slices.Sort(expected) + + var ( + iter = newMergeIter(iters) + record logqlengine.Entry + got []otelstorage.Timestamp + ) + for iter.Next(&record) { + got = append(got, record.Timestamp) + } + require.NoError(t, iter.Err()) + require.Equal(t, expected, got) +}