Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add experimental support to write incoming data to a Kafka-compatible backend #6888

Merged
merged 6 commits into from
Dec 12, 2023

Conversation

pracucci
Copy link
Collaborator

@pracucci pracucci commented Dec 11, 2023

What this PR does

We're building a prototype of an alternative Mimir architecture where the write and read path are fully decoupled, with a Kafka-compatible backend in between. This is going to be a multi-quarter effort and we would like to progressively upstream code changes while building it. The idea is that we'll do our best to keep these changes isolated from the rest of Mimir as much as possible, with few integration points.

In this PR I'm proposing to upstream a basic support to write incoming requests from distributor to a Kafka-compatible backend.

Notes:

  • Configuration is hidden in the auto-generated documentation to avoid confusion to final users while we build the prototype.
  • No CHANGELOG entry while building the prototype, for the same reason as above: to avoid confusion to final users.
  • The plugging in the distributor is not the final one. Currently it's a very hacky way to partition data, while @pstibrany is working on the definitive partitioning scheme.
  • This PR also introduces the franz-go Kafka client, which is the library we selected after testing the segmentio one and this one. Based on our tests, the franz-go library looks more evoluted, mature and stable than the segmentio one.

Which issue(s) this PR fixes or relates to

N/A

Checklist

  • Tests updated.
  • Documentation added.
  • CHANGELOG.md updated - the order of entries should be [CHANGE], [FEATURE], [ENHANCEMENT], [BUGFIX].
  • about-versioning.md updated with experimental features.

Copy link
Contributor

@dimitarvdimitrov dimitarvdimitrov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mostly LGTM, i have one more important remark about the timeouts, but otherwise looks good. I quite liked TestWriter_WriteSync

// after being sent on the network. The actual timeout is increased by the configured overhead.
kgo.RecordRetries(math.MaxInt64),
kgo.RecordDeliveryTimeout(w.writerCfg.KafkaWriteTimeout),
kgo.ProduceRequestTimeout(w.writerCfg.KafkaWriteTimeout),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we set this to something lower? Otherwise I can see how a network timeout would result in no retries:

  1. try to send
  2. wait for ProduceRequestTimeout (== w.writerCfg.KafkaWriteTimeout)
  3. time out
  4. try to send again
  5. fail, it's already past RecordDeliveryTimeout

Should we set it to something like w.writerCfg.KafkaWriteTimeout / 3?

Copy link
Collaborator Author

@pracucci pracucci Dec 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline.

The TL;DR is that it depends on the actual failure scenario.

If the backend is slow, then an higher ProduceRequestTimeout increases the chances of a successful request (in other words, one try of 10s is better than 2x5s tries because maybe just waiting longer fixes it).

If the backend is unhealthy, then a shorter ProduceRequestTimeout is better because it will retry within the RecordDeliveryTimeout but the retry will be successful only if the meanwhile the cluster metadata has been updated and the replica owning a given partition has been moved to another broker, otherwise the client will just keep trying to connect to the previous (unhealthy) one. In a setup where these timeouts are relatively low, it may not be that common that the unhealthy replica has been actually detected as unhealthy and the replica owner for a given partition moved.

Comment on lines +80 to +92
received := mimirpb.WriteRequest{}
require.NoError(t, received.Unmarshal(fetches.Records()[0].Value))
require.Len(t, received.Timeseries, len(multiSeries))

for idx, expected := range multiSeries {
assert.Equal(t, expected.Labels, received.Timeseries[idx].Labels)
assert.Equal(t, expected.Samples, received.Timeseries[idx].Samples)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reading these tests I'm not sure whether the interface of Writer shouldn't just accept a byte slice instead of managing mimir's protocol buffers.

I don't insist on this, but thought it might give better separation and reduce scope a little bit

Copy link
Collaborator Author

@pracucci pracucci Dec 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My take is: the byte-level interface is the Kafka client. Our Writer has a domain-level interface, which means we write domain-level data structures (so timeseries & co).

@pracucci pracucci force-pushed the add-kafka-producer-support branch from 641cda1 to 95b49c7 Compare December 11, 2023 16:11
@pracucci pracucci marked this pull request as ready for review December 11, 2023 16:35
@pracucci pracucci requested review from grafanabot and a team as code owners December 11, 2023 16:35
Copy link
Contributor

@dimitarvdimitrov dimitarvdimitrov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for addressing my comments

@pracucci pracucci merged commit 24591ae into main Dec 12, 2023
@pracucci pracucci deleted the add-kafka-producer-support branch December 12, 2023 05:04
Comment on lines +227 to +237
// By default, the Kafka client allows 1 Produce in-flight request per broker. Disabling write idempotency
// (which we don't need), we can increase the max number of in-flight Produce requests per broker. A higher
// number of in-flight requests, in addition to short buffering ("linger") in client side before firing the
// next Produce request allows us to reduce the end-to-end latency.
//
// The result of the multiplication of producer linger and max in-flight requests should match the maximum
// Produce latency expected by the Kafka backend in a steady state. For example, 50ms * 20 requests = 1s,
// which means the Kafka client will keep issuing a Produce request every 50ms as far as the Kafka backend
// doesn't take longer than 1s to process them (if it takes longer, the client will buffer data and stop
// issuing new Produce requests until some previous ones complete).
kgo.DisableIdempotentWrite(),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is true in other clients, but in franz-go, disabling idempotency forces the client to only issue one request at a time -- franz-go favors not duplicating data. With idempotency, franz-go allows 5 requests per broker (technically 4 due to some internal accounting but it's close enough).

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh wait I missed MaxProduceRequestsInflightPerBroker just two lines down, my mistake. Nice!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for looking at it and your feedback!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants