-
Notifications
You must be signed in to change notification settings - Fork 243
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding the serialization features. #1666
Conversation
Don't be put off by the lines change count, 80% of that is generated code from msgp |
Tests are failing from a data race in the tests themselves since I am accessing them directly. Lemme see if I can fix that. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great work, I like the deduplication algo. I only did a first pass, I'm not yet familiar with the full picture
internal/component/prometheus/remote/queue/serialization/seralizer_test.go
Outdated
Show resolved
Hide resolved
Stats are protocol agnostic, so that if used in prometheus or otel environment they can add their own specific metrics and we dont define protocol specific in the lower level structs. The component in the last PR will expose prometheus compatible ones derived from the callback. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First pass. Looking good :)
@@ -141,7 +141,7 @@ lint: alloylint | |||
# final command runs tests for all other submodules. | |||
test: | |||
$(GO_ENV) go test $(GO_FLAGS) -race $(shell go list ./... | grep -v /integration-tests/) | |||
$(GO_ENV) go test $(GO_FLAGS) ./internal/static/integrations/node_exporter ./internal/static/logs ./internal/component/otelcol/processor/tail_sampling ./internal/component/loki/source/file ./internal/component/loki/source/docker | |||
$(GO_ENV) go test $(GO_FLAGS) ./internal/static/integrations/node_exporter ./internal/static/logs ./internal/component/otelcol/processor/tail_sampling ./internal/component/loki/source/file ./internal/component/loki/source/docker ./internal/component/prometheus/remote/queue/serialization |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'd be running these tests twice, second time without -race
- I don't see the reason why, is that an accident?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is one test that will not be ran twice since I am accessing the var directly to test its value. The others will be ran, I could add the //go:build race to the others. Note most of our exclusions above have some tests that run twice.
internal/component/prometheus/remote/queue/serialization/appender.go
Outdated
Show resolved
Hide resolved
internal/component/prometheus/remote/queue/serialization/appender.go
Outdated
Show resolved
Hide resolved
ts.TS = t | ||
ts.Value = v | ||
ts.Hash = l.Hash() | ||
err := a.s.SendSeries(a.ctx, ts) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it guaranteed that ts
will be returned eventually to the object pool? Would we have a leak if, e.g. the component was removed from Alloy config? I don't see any issues, but would be nice to make this code a bit more clear that this is what's going on, with naming or comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be a required that all time series are returned. Though not in this PR this is checked in a future test via OutStandingTimeSeriesBinary
atomic int. There are end to end tests that ensure at the end of the test this is zero.
internal/component/prometheus/remote/queue/serialization/serializer.go
Outdated
Show resolved
Hide resolved
internal/component/prometheus/remote/queue/types/serialization.go
Outdated
Show resolved
Hide resolved
stringsSlice := make([]string, len(strMapToInt)) | ||
for stringValue, index := range strMapToInt { | ||
stringsSlice[index] = stringValue | ||
} | ||
group.Strings = stringsSlice |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stringsSlice := make([]string, len(strMapToInt)) | |
for stringValue, index := range strMapToInt { | |
stringsSlice[index] = stringValue | |
} | |
group.Strings = stringsSlice | |
dictionary := make([]string, len(strMapToInt)) | |
for stringValue, index := range strMapToInt { | |
dictionary[index] = stringValue | |
} | |
group.dictionary = dictionary |
I like to use the concept of dictionary here, or lookup table... it makes it easier to figure out what's going on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean to use an actual map? Or a rename like above?
} | ||
group.Strings = stringsSlice | ||
|
||
buf, err := group.MarshalMsg(s.msgpBuffer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sooo... is it worth it to do the dictionary stuff? I guess yes, but on the other hand I know that compression algos would do something similar automatically, snappy can refer to previous part of the data to reduce repetition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One second had a bug in my test re-evaluating.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright back with much more verifiable test.
//go:generate msgp
package main
import (
"fmt"
"math/rand"
"reflect"
"github.com/golang/snappy"
)
// 5 long really random
// 5371616
// 4732108
// 5 long half random
// 5050060
// 3929185
// 5 long quarter random
// 4455979
// 2918973
func main() {
metrics := make([]map[string]string, 0)
// 100k metrics with 10 labels each
for i := 0; i < 100_000; i++ {
metrics = append(metrics, getLabels())
}
ss := &StringString{Labels: metrics}
bb, err := ss.MarshalMsg(nil)
if err != nil {
panic(err)
}
out := snappy.Encode(nil, bb)
dc, _ := snappy.Decode(nil, out)
err = validateStringString(dc, metrics)
if err != nil {
panic(err)
}
println(fmt.Printf("dictionary based is %d bytes", len(out)))
ib := &IndexBased{
String: make([]string, 0),
Names: make([][]uint32, 0),
Values: make([][]uint32, 0),
}
alignIndexBased(ib, metrics)
bb, err = ib.MarshalMsg(nil)
if err != nil {
panic(err)
}
out = snappy.Encode(nil, bb)
dc, _ = snappy.Decode(nil, out)
err = validateIndexBased(dc, metrics)
println(fmt.Printf("index based is %d bytes", len(out)))
}
func validateStringString(bb []byte, metrics []map[string]string) error {
ss := &StringString{}
_, err := ss.UnmarshalMsg(bb)
if err != nil {
return err
}
for i, m := range metrics {
if !reflect.DeepEqual(ss.Labels[i], m) {
return fmt.Errorf("invalid metric at index %d", i)
}
}
return nil
}
func validateIndexBased(bb []byte, metrics []map[string]string) error {
ss := &IndexBased{}
_, err := ss.UnmarshalMsg(bb)
if err != nil {
return err
}
for i, m := range metrics {
if !reflect.DeepEqual(getMetric(ss.Names[i], ss.Values[i], ss.String), m) {
return fmt.Errorf("invalid metric at index %d", i)
}
}
return nil
}
func getMetric(names []uint32, values []uint32, strings []string) map[string]string {
metric := make(map[string]string)
for i, v := range names {
metric[strings[v]] = strings[values[i]]
}
return metric
}
func alignIndexBased(ib *IndexBased, strings []map[string]string) {
index := 0
stringsList := make(map[string]int)
for _, metric := range strings {
names := make([]uint32, 0)
values := make([]uint32, 0)
for k, v := range metric {
keyIndex, ok := stringsList[k]
if !ok {
stringsList[k] = index
ib.String = append(ib.String, k)
keyIndex = index
index++
}
valIndex, ok := stringsList[v]
if !ok {
stringsList[v] = index
ib.String = append(ib.String, v)
valIndex = index
index++
}
names = append(names, uint32(keyIndex))
values = append(values, uint32(valIndex))
}
ib.Names = append(ib.Names, names)
ib.Values = append(ib.Values, values)
}
ib.String = make([]string, len(stringsList))
for k, v := range stringsList {
ib.String[v] = k
}
}
func getLabels() map[string]string {
retLbls := make(map[string]string, 0)
for i := 0; i < 10; i++ {
retLbls[fmt.Sprintf("label_%d", i)] = randString()
}
return retLbls
}
var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
var halfRandom = []rune("abcdefghijklmnopqrstuvwxyz")
var quarterRandom = []rune("abcdefghijkl")
func randString() string {
b := make([]rune, rand.Intn(5))
for i := range b {
b[i] = letterRunes[rand.Intn(len(letterRunes))]
}
return string(b)
}
type IndexBased struct {
Names [][]uint32
Values [][]uint32
String []string
}
type StringString struct {
Labels []map[string]string
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general the index based is never worse, and IMO in many cases is 60% of the size of the pure string based.
Results from the above test, changing out the letterRunes to small sets.
// 5 char long really random
// 5371616 string map
// 4732108 index based
// 5 long half random
// 5050060
// 3929185
// 5 long quarter random
// 4455979
// 2918973
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The lower the cardinality the better but even in worse case its not terrible.
internal/component/prometheus/remote/queue/serialization/serializer_bench_test.go
Outdated
Show resolved
Hide resolved
…der.go Co-authored-by: Piotr <[email protected]>
…lizer.go Co-authored-by: Piotr <[email protected]>
Co-authored-by: Piotr <[email protected]>
…lizer.go Co-authored-by: Piotr <[email protected]>
Going to merge this and we can revisit any followup in the big merge on specific points. |
* readme * fix readme * Add filequeue functionality (#1601) * Checkin for file queue * add comment * Update internal/component/prometheus/remote/queue/filequeue/filequeue.go Co-authored-by: Piotr <[email protected]> * Update internal/component/prometheus/remote/queue/filequeue/filequeue.go Co-authored-by: Piotr <[email protected]> * Update internal/component/prometheus/remote/queue/filequeue/filequeue.go Co-authored-by: Piotr <[email protected]> * Update internal/component/prometheus/remote/queue/filequeue/filequeue.go Co-authored-by: Piotr <[email protected]> * Update internal/component/prometheus/remote/queue/filequeue/filequeue.go Co-authored-by: Piotr <[email protected]> * Update internal/component/prometheus/remote/queue/filequeue/filequeue.go Co-authored-by: Piotr <[email protected]> * Update internal/component/prometheus/remote/queue/filequeue/filequeue.go Co-authored-by: Piotr <[email protected]> * naming and error handling feedback from PR * Update internal/component/prometheus/remote/queue/filequeue/filequeue.go Co-authored-by: Piotr <[email protected]> * Update internal/component/prometheus/remote/queue/filequeue/filequeue.go Co-authored-by: Piotr <[email protected]> * Update internal/component/prometheus/remote/queue/filequeue/filequeue.go Co-authored-by: Piotr <[email protected]> * drop benchmark * rename get to pop --------- Co-authored-by: Piotr <[email protected]> * Adding the serialization features. (#1666) * Adding the serialization features. * Dont test this with race condition since we access vars directly. * Fix test. * Fix typo in file name and return early in DeserializeToSeriesGroup. * Update internal/component/prometheus/remote/queue/serialization/appender.go Co-authored-by: Piotr <[email protected]> * Update internal/component/prometheus/remote/queue/serialization/serializer.go Co-authored-by: Piotr <[email protected]> * Rename to indicate that TimeSeries are Put/Get from a pool. * Remove func that was about the same number of lines as inlining. * Update internal/component/prometheus/remote/queue/types/serialization.go Co-authored-by: Piotr <[email protected]> * Update internal/component/prometheus/remote/queue/serialization/serializer.go Co-authored-by: Piotr <[email protected]> * Change benchmark to be more specific. --------- Co-authored-by: Piotr <[email protected]> * Network wal pr (#1717) * Checkin the networking items. * Fix for config updating and tests. * Update internal/component/prometheus/remote/queue/network/loop.go Co-authored-by: William Dumont <[email protected]> * Update internal/component/prometheus/remote/queue/network/loop.go Co-authored-by: Piotr <[email protected]> * pr feedback * pr feedback * simplify stats * PR feedback --------- Co-authored-by: William Dumont <[email protected]> Co-authored-by: Piotr <[email protected]> * Component (#1823) * Checkin the networking items. * Fix for config updating and tests. * Update internal/component/prometheus/remote/queue/network/loop.go Co-authored-by: William Dumont <[email protected]> * Update internal/component/prometheus/remote/queue/network/loop.go Co-authored-by: Piotr <[email protected]> * pr feedback * pr feedback * simplify stats * simplify stats * Initial push. * docs and some renaming * Update docs/sources/reference/components/prometheus/prometheus.remote.queue.md Co-authored-by: Clayton Cornell <[email protected]> * Update docs/sources/reference/components/prometheus/prometheus.remote.queue.md Co-authored-by: Clayton Cornell <[email protected]> * Update docs/sources/reference/components/prometheus/prometheus.remote.queue.md Co-authored-by: Clayton Cornell <[email protected]> * Update docs/sources/reference/components/prometheus/prometheus.remote.queue.md Co-authored-by: Clayton Cornell <[email protected]> * Update docs/sources/reference/components/prometheus/prometheus.remote.queue.md Co-authored-by: Clayton Cornell <[email protected]> * Update docs/sources/reference/components/prometheus/prometheus.remote.queue.md Co-authored-by: Clayton Cornell <[email protected]> * Update docs/sources/reference/components/prometheus/prometheus.remote.queue.md Co-authored-by: Clayton Cornell <[email protected]> * Changes and testing. * Update docs. * Update docs. * Fix race conditions in unit tests. * Tweaking unit tests. * lower threshold more. * lower threshold more. * Fix deadlock in manager tests. * rollback to previous * Update docs/sources/reference/components/prometheus/prometheus.remote.queue.md Co-authored-by: Clayton Cornell <[email protected]> * Update docs/sources/reference/components/prometheus/prometheus.remote.queue.md Co-authored-by: Clayton Cornell <[email protected]> * Update docs/sources/reference/components/prometheus/prometheus.remote.queue.md Co-authored-by: Clayton Cornell <[email protected]> * Update docs/sources/reference/components/prometheus/prometheus.remote.queue.md Co-authored-by: Clayton Cornell <[email protected]> * Update docs/sources/reference/components/prometheus/prometheus.remote.queue.md Co-authored-by: Clayton Cornell <[email protected]> * Update docs/sources/reference/components/prometheus/prometheus.remote.queue.md Co-authored-by: Clayton Cornell <[email protected]> * Update docs/sources/reference/components/prometheus/prometheus.remote.queue.md Co-authored-by: Clayton Cornell <[email protected]> * Update docs/sources/reference/components/prometheus/prometheus.remote.queue.md Co-authored-by: Paulin Todev <[email protected]> * Docs PR feedback * Update docs/sources/reference/components/prometheus/prometheus.remote.queue.md Co-authored-by: Piotr <[email protected]> * PR feedback * PR feedback * PR feedback * PR feedback * Fix typo * Fix typo * Fix bug. * Fix docs --------- Co-authored-by: William Dumont <[email protected]> Co-authored-by: Piotr <[email protected]> Co-authored-by: Clayton Cornell <[email protected]> Co-authored-by: Paulin Todev <[email protected]> * Change name to write instead of remote. * Fix issue. * Fix issue. * Dont depend on random sync.pool behavior. * small clarification on changelog. * PR feedback --------- Co-authored-by: Piotr <[email protected]> Co-authored-by: William Dumont <[email protected]> Co-authored-by: Clayton Cornell <[email protected]> Co-authored-by: Paulin Todev <[email protected]>
This adds the serialization side of converting series to the binary format. The binary format is time series where the strings are deduplicated with actual marshalling handled by the msgp library. I tested roughly 6 libraries listed here. Msgp hit the sweetspot of ease of use and size and features. Such as reusing arrays if they were passed in.