Skip to content

Commit

Permalink
Move wal queue to its own repository and minor bug fixes. (grafana#1994)
Browse files Browse the repository at this point in the history
* Working on wal cleanup

* use wrapped mailboxes

* more safely use mailboxes.

* Fix check that is no longer needed.

* Cleanup

* Add test for metrics.

* remove check

* fix check

* lower threshold

* The go func was bogging down tests.

* Adjusting times.

* Fix issue with items not being put back into the timeseries pool.

* Add comment.

* Remove unneeded test.

* Use the same concepts.

* Use the same concepts.

* Switch to using the walqueue repo.

* add changelog

* Fix go.mod and empty file.

* update go.mod

* remove unneeded file and update to cleaner code for walqueue.

* Remove race exclusion

* Remove race exclusion

* Update CHANGELOG.md

Co-authored-by: Piotr <[email protected]>

---------

Co-authored-by: Piotr <[email protected]>
  • Loading branch information
2 people authored and vaxvms committed Nov 20, 2024
1 parent 014bab3 commit 9b5a383
Show file tree
Hide file tree
Showing 35 changed files with 63 additions and 7,622 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ Main (unreleased)

- Fixed an issue in the `prometheus.exporter.postgres` component that would leak goroutines when the target was not reachable (@dehaansa)

- Fixed issue with reloading configuration and prometheus metrics duplication in `prometheus.write.queue`. (@mattdurham)

### Other changes

- Change the stability of the `livedebugging` feature from "experimental" to "generally available". (@wildum)
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ lint: alloylint
# final command runs tests for all other submodules.
test:
$(GO_ENV) go test $(GO_FLAGS) -race $(shell go list ./... | grep -v /integration-tests/)
$(GO_ENV) go test $(GO_FLAGS) ./internal/static/integrations/node_exporter ./internal/static/logs ./internal/component/otelcol/processor/tail_sampling ./internal/component/loki/source/file ./internal/component/loki/source/docker ./internal/component/prometheus/write/queue/serialization ./internal/component/prometheus/write/queue/network
$(GO_ENV) go test $(GO_FLAGS) ./internal/static/integrations/node_exporter ./internal/static/logs ./internal/component/otelcol/processor/tail_sampling ./internal/component/loki/source/file ./internal/component/loki/source/docker
$(GO_ENV) find . -name go.mod -not -path "./go.mod" -execdir go test -race ./... \;

test-packages:
Expand Down
14 changes: 9 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ require (
github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc
github.com/grafana/tail v0.0.0-20230510142333-77b18831edf0
github.com/grafana/vmware_exporter v0.0.5-beta
github.com/grafana/walqueue v0.0.0-20241114193920-da8174120940
github.com/hashicorp/consul/api v1.29.5
github.com/hashicorp/go-discover v0.0.0-20230724184603-e89ebd1b2f65
github.com/hashicorp/go-multierror v1.1.1
Expand Down Expand Up @@ -165,7 +166,7 @@ require (
github.com/prometheus/mysqld_exporter v0.14.0
github.com/prometheus/node_exporter v1.6.0
github.com/prometheus/procfs v0.15.1
github.com/prometheus/prometheus v0.54.1 // a.k.a. v2.51.2
github.com/prometheus/prometheus v0.55.1 // a.k.a. v2.51.2
github.com/prometheus/snmp_exporter v0.26.0 // if you update the snmp_exporter version, make sure to update the SNMP_VERSION in _index
github.com/prometheus/statsd_exporter v0.22.8
github.com/richardartoul/molecule v1.0.1-0.20221107223329-32cfee06a052
Expand Down Expand Up @@ -468,7 +469,7 @@ require (
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/dvsekhvalnov/jose2go v1.6.0 // indirect
github.com/eapache/go-resiliency v1.7.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/edsrzf/mmap-go v1.1.0 // indirect
github.com/efficientgo/core v1.0.0-rc.2 // indirect
Expand Down Expand Up @@ -744,15 +745,15 @@ require (
github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635 // indirect
github.com/tencentcloud/tencentcloud-sdk-go v1.0.162 // indirect
github.com/tg123/go-htpasswd v1.2.2 // indirect
github.com/tinylib/msgp v1.2.2
github.com/tinylib/msgp v1.2.4 // indirect
github.com/tklauser/go-sysconf v0.3.13 // indirect
github.com/tklauser/numcpus v0.7.0 // indirect
github.com/tomnomnom/linkheader v0.0.0-20180905144013-02ca5825eb80 // indirect
github.com/uber/jaeger-lib v2.4.1+incompatible // indirect
github.com/vertica/vertica-sql-go v1.3.3 // indirect
github.com/vishvananda/netlink v1.2.1-beta.2 // indirect
github.com/vishvananda/netns v0.0.4 // indirect
github.com/vladopajic/go-actor v0.9.0
github.com/vladopajic/go-actor v0.9.0 // indirect
github.com/vmware/govmomi v0.44.1 // indirect
github.com/vultr/govultr/v2 v2.17.2 // indirect
github.com/willf/bitset v1.1.11 // indirect
Expand Down Expand Up @@ -789,7 +790,7 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.7.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.31.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.31.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.31.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.31.0
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.31.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.31.0 // indirect
go.opentelemetry.io/otel/log v0.7.0 // indirect
Expand Down Expand Up @@ -942,3 +943,6 @@ exclude (
)

replace github.com/prometheus/procfs => github.com/prometheus/procfs v0.12.0

// This is to handle issues witn synchronous mailbox and closing channels.
replace github.com/vladopajic/go-actor => github.com/grafana/go-actor v0.0.0-20241113133736-e18c4a5c12f4
10 changes: 6 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1223,6 +1223,8 @@ github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2 h1:qhugDMdQ4
github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2/go.mod h1:w/aiO1POVIeXUQyl0VQSZjl5OAGDTL5aX+4v0RA1tcw=
github.com/grafana/dskit v0.0.0-20240104111617-ea101a3b86eb h1:AWE6+kvtE18HP+lRWNUCyvymyrFSXs6TcS2vXIXGIuw=
github.com/grafana/dskit v0.0.0-20240104111617-ea101a3b86eb/go.mod h1:kkWM4WUV230bNG3urVRWPBnSJHs64y/0RmWjftnnn0c=
github.com/grafana/go-actor v0.0.0-20241113133736-e18c4a5c12f4 h1:jid0h8vbKxOfHbVu/5exi6fz2y9/vKmtcKtTfuXElMY=
github.com/grafana/go-actor v0.0.0-20241113133736-e18c4a5c12f4/go.mod h1:b4thGZ60fnjC3TaJ4XeCN+uZXM+ec27t3ibqFfd8iAk=
github.com/grafana/go-gelf/v2 v2.0.1 h1:BOChP0h/jLeD+7F9mL7tq10xVkDG15he3T1zHuQaWak=
github.com/grafana/go-gelf/v2 v2.0.1/go.mod h1:lexHie0xzYGwCgiRGcvZ723bSNyNI8ZRD4s0CLobh90=
github.com/grafana/go-offsets-tracker v0.1.7 h1:2zBQ7iiGzvyXY7LA8kaaSiEqH/Yx82UcfRabbY5aOG4=
Expand Down Expand Up @@ -1276,6 +1278,8 @@ github.com/grafana/tail v0.0.0-20230510142333-77b18831edf0 h1:bjh0PVYSVVFxzINqPF
github.com/grafana/tail v0.0.0-20230510142333-77b18831edf0/go.mod h1:7t5XR+2IA8P2qggOAHTj/GCZfoLBle3OvNSYh1VkRBU=
github.com/grafana/vmware_exporter v0.0.5-beta h1:2JCqzIWJzns8FN78wPsueC9rT3e3kZo2OUoL5kGMjdM=
github.com/grafana/vmware_exporter v0.0.5-beta/go.mod h1:1CecUZII0zVsVcHtNfNeTTcxK7EksqAsAn/TCCB0Mh4=
github.com/grafana/walqueue v0.0.0-20241114193920-da8174120940 h1:g086EMuMz94kliAaT5RanZ+R/wp5JdD4MZdoCWg0oDQ=
github.com/grafana/walqueue v0.0.0-20241114193920-da8174120940/go.mod h1:vaxO1V0q1dptHEiTIMW1krRy+aehkYyC2YGrKPyGxHY=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/grobie/gomemcache v0.0.0-20230213081705-239240bbc445 h1:FlKQKUYPZ5yDCN248M3R7x8yu2E3yEZ0H7aLomE4EoE=
github.com/grobie/gomemcache v0.0.0-20230213081705-239240bbc445/go.mod h1:L69/dBlPQlWkcnU76WgcppK5e4rrxzQdi6LhLnK/ytA=
Expand Down Expand Up @@ -2446,8 +2450,8 @@ github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhV
github.com/tilinna/clock v1.1.0 h1:6IQQQCo6KoBxVudv6gwtY8o4eDfhHo8ojA5dP0MfhSs=
github.com/tilinna/clock v1.1.0/go.mod h1:ZsP7BcY7sEEz7ktc0IVy8Us6boDrK8VradlKRUGfOao=
github.com/tinylib/msgp v1.1.5/go.mod h1:eQsjooMTnV42mHu917E26IogZ2930nFyBQdofk10Udg=
github.com/tinylib/msgp v1.2.2 h1:iHiBE1tJQwFI740SPEPkGE8cfhNfrqOYRlH450BnC/4=
github.com/tinylib/msgp v1.2.2/go.mod h1:ykjzy2wzgrlvpDCRc4LA8UXy6D8bzMSuAF3WD57Gok0=
github.com/tinylib/msgp v1.2.4 h1:yLFeUGostXXSGW5vxfT5dXG/qzkn4schv2I7at5+hVU=
github.com/tinylib/msgp v1.2.4/go.mod h1:ykjzy2wzgrlvpDCRc4LA8UXy6D8bzMSuAF3WD57Gok0=
github.com/tklauser/go-sysconf v0.3.11/go.mod h1:GqXfhXY3kiPa0nAXPDIQIWzJbMCB7AmcWpGR8lSZfqI=
github.com/tklauser/go-sysconf v0.3.13 h1:GBUpcahXSpR2xN01jhkNAbTLRk2Yzgggk8IM08lq3r4=
github.com/tklauser/go-sysconf v0.3.13/go.mod h1:zwleP4Q4OehZHGn4CYZDipCgg9usW5IJePewFCGVEa0=
Expand Down Expand Up @@ -2488,8 +2492,6 @@ github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1
github.com/vishvananda/netns v0.0.4 h1:Oeaw1EM2JMxD51g9uhtC0D7erkIjgmj8+JZc26m1YX8=
github.com/vishvananda/netns v0.0.4/go.mod h1:SpkAiCQRtJ6TvvxPnOSyH3BMl6unz3xZlaprSwhNNJM=
github.com/vjeantet/grok v1.0.0/go.mod h1:/FWYEVYekkm+2VjcFmO9PufDU5FgXHUz9oy2EGqmQBo=
github.com/vladopajic/go-actor v0.9.0 h1:fFj5RDGo4YZ6XCx2BWCThx/efOGRwokTpsc3CWHVEIU=
github.com/vladopajic/go-actor v0.9.0/go.mod h1:CKVRXStfjEIi7K74SyFQv/KfM8a/Po57bxmbBGv9YwE=
github.com/vmihailenco/msgpack/v4 v4.3.13 h1:A2wsiTbvp63ilDaWmsk2wjx6xZdxQOvpiNlKBGKKXKI=
github.com/vmihailenco/msgpack/v4 v4.3.13/go.mod h1:gborTTJjAo/GWTqqRjrLCn9pgNN+NXzzngzBKDPIqw4=
github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,4 +221,3 @@ func TestUnmarshalDatadogLogsConfig(t *testing.T) {
})
}
}

78 changes: 33 additions & 45 deletions internal/component/prometheus/write/queue/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,8 @@ import (

"github.com/go-kit/log"
"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/component/prometheus/write/queue/filequeue"
"github.com/grafana/alloy/internal/component/prometheus/write/queue/network"
"github.com/grafana/alloy/internal/component/prometheus/write/queue/serialization"
"github.com/grafana/alloy/internal/component/prometheus/write/queue/types"
"github.com/grafana/alloy/internal/featuregate"
"github.com/prometheus/client_golang/prometheus"
promqueue "github.com/grafana/walqueue/implementations/prometheus"
"github.com/prometheus/prometheus/storage"
)

Expand All @@ -34,7 +30,7 @@ func NewComponent(opts component.Options, args Arguments) (*Queue, error) {
opts: opts,
args: args,
log: opts.Logger,
endpoints: map[string]*endpoint{},
endpoints: map[string]promqueue.Queue{},
}

err := s.createEndpoints()
Expand All @@ -58,7 +54,7 @@ type Queue struct {
args Arguments
opts component.Options
log log.Logger
endpoints map[string]*endpoint
endpoints map[string]promqueue.Queue
}

// Run starts the component, blocking until ctx is canceled or the component
Expand Down Expand Up @@ -90,60 +86,52 @@ func (s *Queue) Update(args component.Arguments) error {
defer s.mut.Unlock()

newArgs := args.(Arguments)
sync.OnceFunc(func() {
s.opts.OnStateChange(Exports{Receiver: s})
})
// If they are the same do nothing.
if reflect.DeepEqual(newArgs, s.args) {
return nil
}
s.args = newArgs
// TODO @mattdurham need to cycle through the endpoints figuring out what changed instead of this global stop and start.
// This will cause data in the endpoints and their children to be lost.
if len(s.endpoints) > 0 {
for _, ep := range s.endpoints {
// Figure out which endpoint is new, which is updated, and which needs to be gone.
// So add all the endpoints and then if they are in the new config then remove them from deletable.
deletableEndpoints := make(map[string]struct{})
for k := range s.endpoints {
deletableEndpoints[k] = struct{}{}
}

for _, epCfg := range s.args.Endpoints {
delete(deletableEndpoints, epCfg.Name)
ep, found := s.endpoints[epCfg.Name]
// If found stop and recreate.
if found {
// Stop and loose all the signals in the queue.
// TODO drain the signals and re-add them
ep.Stop()
}
s.endpoints = map[string]*endpoint{}
}
err := s.createEndpoints()
if err != nil {
return err
nativeCfg := epCfg.ToNativeType()
// Create
end, err := promqueue.NewQueue(epCfg.Name, nativeCfg, filepath.Join(s.opts.DataPath, epCfg.Name, "wal"), uint32(s.args.Persistence.MaxSignalsToBatch), s.args.Persistence.BatchInterval, s.args.TTL, s.opts.Registerer, "alloy", s.opts.Logger)
if err != nil {
return err
}
end.Start()
s.endpoints[epCfg.Name] = end

}
for _, ep := range s.endpoints {
ep.Start()
// Now we need to figure out the endpoints that were not touched and able to be deleted.
for name := range deletableEndpoints {
s.endpoints[name].Stop()
delete(s.endpoints, name)
}
return nil
}

func (s *Queue) createEndpoints() error {
// @mattdurham not in love with this code.
for _, ep := range s.args.Endpoints {
reg := prometheus.WrapRegistererWith(prometheus.Labels{"endpoint": ep.Name}, s.opts.Registerer)
stats := types.NewStats("alloy", "queue_series", reg)
stats.SeriesBackwardsCompatibility(reg)
meta := types.NewStats("alloy", "queue_metadata", reg)
meta.MetaBackwardsCompatibility(reg)
cfg := ep.ToNativeType()
client, err := network.New(cfg, s.log, stats.UpdateNetwork, meta.UpdateNetwork)
if err != nil {
return err
}
end := NewEndpoint(client, nil, s.args.TTL, s.opts.Logger)
fq, err := filequeue.NewQueue(filepath.Join(s.opts.DataPath, ep.Name, "wal"), func(ctx context.Context, dh types.DataHandle) {
_ = end.incoming.Send(ctx, dh)
}, s.opts.Logger)
if err != nil {
return err
}
serial, err := serialization.NewSerializer(types.SerializerConfig{
MaxSignalsInBatch: uint32(s.args.Persistence.MaxSignalsToBatch),
FlushFrequency: s.args.Persistence.BatchInterval,
}, fq, stats.UpdateSerializer, s.opts.Logger)
nativeCfg := ep.ToNativeType()
end, err := promqueue.NewQueue(ep.Name, nativeCfg, filepath.Join(s.opts.DataPath, ep.Name, "wal"), uint32(s.args.Persistence.MaxSignalsToBatch), s.args.Persistence.BatchInterval, s.args.TTL, s.opts.Registerer, "alloy", s.opts.Logger)
if err != nil {
return err
}
end.serializer = serial
s.endpoints[ep.Name] = end
}
return nil
Expand All @@ -158,7 +146,7 @@ func (c *Queue) Appender(ctx context.Context) storage.Appender {

children := make([]storage.Appender, 0)
for _, ep := range c.endpoints {
children = append(children, serialization.NewAppender(ctx, c.args.TTL, ep.serializer, c.opts.Logger))
children = append(children, ep.Appender(ctx))
}
return &fanout{children: children}
}
9 changes: 5 additions & 4 deletions internal/component/prometheus/write/queue/e2e_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,10 +615,7 @@ func runE2eStats(t *testing.T, test statsTest) {
}
require.NoError(t, app.Commit())
}()
tm := time.NewTimer(8 * time.Second)
<-tm.C
cancel()

time.Sleep(5 * time.Second)
require.Eventually(t, func() bool {
dtos, gatherErr := reg.Gather()
require.NoError(t, gatherErr)
Expand All @@ -632,9 +629,13 @@ func runE2eStats(t *testing.T, test statsTest) {
// Make sure we have a few metrics.
return found > 1
}, 10*time.Second, 1*time.Second)

metrics := make(map[string]float64)
dtos, err := reg.Gather()
require.NoError(t, err)
// Cancel needs to be here since it will unregister the metrics.
cancel()

// Get the value of metrics.
for _, d := range dtos {
metrics[*d.Name] = getValue(d)
Expand Down
7 changes: 5 additions & 2 deletions internal/component/prometheus/write/queue/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import (

"github.com/golang/snappy"
"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/component/prometheus/write/queue/types"
"github.com/grafana/alloy/internal/runtime/logging"
"github.com/grafana/alloy/internal/util"
"github.com/grafana/walqueue/types"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
Expand Down Expand Up @@ -155,6 +155,7 @@ func runTest(t *testing.T, add func(index int, appendable storage.Appender) (flo
require.NoError(t, err)
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)

go func() {
runErr := c.Run(ctx)
require.NoError(t, runErr)
Expand All @@ -178,6 +179,7 @@ func runTest(t *testing.T, add func(index int, appendable storage.Appender) (flo
require.NoError(t, app.Commit())
}()
}

// This is a weird use case to handle eventually.
// With race turned on this can take a long time.
tm := time.NewTimer(20 * time.Second)
Expand All @@ -186,6 +188,7 @@ func runTest(t *testing.T, add func(index int, appendable storage.Appender) (flo
case <-tm.C:
require.Truef(t, false, "failed to collect signals in the appropriate time")
}

cancel()

for i := 0; i < samples.Len(); i++ {
Expand Down Expand Up @@ -213,7 +216,7 @@ func runTest(t *testing.T, add func(index int, appendable storage.Appender) (flo
}
require.Eventuallyf(t, func() bool {
return types.OutStandingTimeSeriesBinary.Load() == 0
}, 2*time.Second, 100*time.Millisecond, "there are %d time series not collected", types.OutStandingTimeSeriesBinary.Load())
}, 20*time.Second, 1*time.Second, "there are %d time series not collected", types.OutStandingTimeSeriesBinary.Load())
}

func handlePost(t *testing.T, _ http.ResponseWriter, r *http.Request) ([]prompb.TimeSeries, []prompb.MetricMetadata) {
Expand Down
Loading

0 comments on commit 9b5a383

Please sign in to comment.