-
Notifications
You must be signed in to change notification settings - Fork 569
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add experimental support to write incoming data to a Kafka-compatible backend #6888
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mostly LGTM, i have one more important remark about the timeouts, but otherwise looks good. I quite liked TestWriter_WriteSync
pkg/storage/ingest/writer.go
Outdated
// after being sent on the network. The actual timeout is increased by the configured overhead. | ||
kgo.RecordRetries(math.MaxInt64), | ||
kgo.RecordDeliveryTimeout(w.writerCfg.KafkaWriteTimeout), | ||
kgo.ProduceRequestTimeout(w.writerCfg.KafkaWriteTimeout), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't we set this to something lower? Otherwise I can see how a network timeout would result in no retries:
- try to send
- wait for
ProduceRequestTimeout
(== w.writerCfg.KafkaWriteTimeout
) - time out
- try to send again
- fail, it's already past
RecordDeliveryTimeout
Should we set it to something like w.writerCfg.KafkaWriteTimeout / 3
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline.
The TL;DR is that it depends on the actual failure scenario.
If the backend is slow, then an higher ProduceRequestTimeout
increases the chances of a successful request (in other words, one try of 10s is better than 2x5s tries because maybe just waiting longer fixes it).
If the backend is unhealthy, then a shorter ProduceRequestTimeout
is better because it will retry within the RecordDeliveryTimeout
but the retry will be successful only if the meanwhile the cluster metadata has been updated and the replica owning a given partition has been moved to another broker, otherwise the client will just keep trying to connect to the previous (unhealthy) one. In a setup where these timeouts are relatively low, it may not be that common that the unhealthy replica has been actually detected as unhealthy and the replica owner for a given partition moved.
received := mimirpb.WriteRequest{} | ||
require.NoError(t, received.Unmarshal(fetches.Records()[0].Value)) | ||
require.Len(t, received.Timeseries, len(multiSeries)) | ||
|
||
for idx, expected := range multiSeries { | ||
assert.Equal(t, expected.Labels, received.Timeseries[idx].Labels) | ||
assert.Equal(t, expected.Samples, received.Timeseries[idx].Samples) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reading these tests I'm not sure whether the interface of Writer shouldn't just accept a byte slice instead of managing mimir's protocol buffers.
I don't insist on this, but thought it might give better separation and reduce scope a little bit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My take is: the byte-level interface is the Kafka client. Our Writer has a domain-level interface, which means we write domain-level data structures (so timeseries & co).
… backend Signed-off-by: Marco Pracucci <[email protected]>
Signed-off-by: Marco Pracucci <[email protected]>
Signed-off-by: Marco Pracucci <[email protected]>
Signed-off-by: Marco Pracucci <[email protected]>
Signed-off-by: Marco Pracucci <[email protected]>
641cda1
to
95b49c7
Compare
Signed-off-by: Marco Pracucci <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for addressing my comments
// By default, the Kafka client allows 1 Produce in-flight request per broker. Disabling write idempotency | ||
// (which we don't need), we can increase the max number of in-flight Produce requests per broker. A higher | ||
// number of in-flight requests, in addition to short buffering ("linger") in client side before firing the | ||
// next Produce request allows us to reduce the end-to-end latency. | ||
// | ||
// The result of the multiplication of producer linger and max in-flight requests should match the maximum | ||
// Produce latency expected by the Kafka backend in a steady state. For example, 50ms * 20 requests = 1s, | ||
// which means the Kafka client will keep issuing a Produce request every 50ms as far as the Kafka backend | ||
// doesn't take longer than 1s to process them (if it takes longer, the client will buffer data and stop | ||
// issuing new Produce requests until some previous ones complete). | ||
kgo.DisableIdempotentWrite(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is true in other clients, but in franz-go, disabling idempotency forces the client to only issue one request at a time -- franz-go favors not duplicating data. With idempotency, franz-go allows 5 requests per broker (technically 4 due to some internal accounting but it's close enough).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh wait I missed MaxProduceRequestsInflightPerBroker just two lines down, my mistake. Nice!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for looking at it and your feedback!
What this PR does
We're building a prototype of an alternative Mimir architecture where the write and read path are fully decoupled, with a Kafka-compatible backend in between. This is going to be a multi-quarter effort and we would like to progressively upstream code changes while building it. The idea is that we'll do our best to keep these changes isolated from the rest of Mimir as much as possible, with few integration points.
In this PR I'm proposing to upstream a basic support to write incoming requests from distributor to a Kafka-compatible backend.
Notes:
Which issue(s) this PR fixes or relates to
N/A
Checklist
CHANGELOG.md
updated - the order of entries should be[CHANGE]
,[FEATURE]
,[ENHANCEMENT]
,[BUGFIX]
.about-versioning.md
updated with experimental features.