-
Notifications
You must be signed in to change notification settings - Fork 61
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dekaf materialization endpoint support #1840
base: master
Are you sure you want to change the base?
Conversation
173b9a2
to
2528553
Compare
41b05c2
to
457cb62
Compare
cd55a23
to
a16d4c5
Compare
a432255
to
ff0ba1d
Compare
e1c4157
to
4a38630
Compare
Alright, I could probably do another few rounds of self-review here, but I think it's at a good enough spot to mark this as ready for review. Let me know what y'all think! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sending comments from a partial review. I haven't finished.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I've finished a pass.
As a meta comment, for big features like this please either:
a) Open multiple smaller PRs which can be reviewed independently, or
b) Maintain ever-green commits which are re-written for the reader: they should tell more bite-size stories about changes being made, stand on their own, and be feasible to review (or re-review) independently. This is the workflow @psFried and I use. In essence each commit is a mini-PR -- it requires that you regularly rebase/squash/fixup as you develop.
// Create value shape by merging all projected fields in the schema | ||
let (field_selected_shape, projections) = if let Some(binding) = binding { | ||
let selection = binding | ||
.field_selection |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a general comment, I'd like to see this kind of extraction and swizzling pulled out into pure functions over input data structures, returning a new output data structure (or error, if validation fails). I'll point to the various Task
structs in the runtime
crate as an example of this.
This makes them test-able without postgrest in the loop. Recently, it became possible to build catalog specs in Rust as part of tests, which would let you exercise your logic with "real" tasks. (An example: https://github.com/estuary/flow/blob/master/crates/activate/src/lib.rs#L858-L875)
Big picture, the goal is to roughly separate routines into "glue" (connecting services, moving data, calling things) which are easy to inspect because they obviously succeed or obviously fail, and "business logic" (extract / validate / swizzle / build), which are easy to test if they're pure functions.
6f12370
to
692aeb2
Compare
…s using `/authorize/dekaf` and `/authorize/task` Also add a hint for passing a collection name as a topic name, when the binding has renamed that topic
* Connector projections: emit recommended constraints for all fields -- you get everything by default, and you can modify the selection however you like * Schema: Build a schema from the materialization's built spec's `field_selection` and the collection's projections that will match the extracted documents * Extraction: Implement field extraction using the `extractors` crate to emit documents that match the "value schema"
ec0b63f
to
94edba4
Compare
… a Session and write them to the correct ops logs journal Also support filtering logs by the requested shard log level
Then implement some tests to validate field selection logic
94edba4
to
debb6ca
Compare
Okay, I think this is ready for another look. I changed around the field selection to be closer to your suggested pattern of building up a list of extractors and their associated avro schemas. The realization that avro documents can be encoded sequentially allowed me to avoid building up the full FWIW though, I ended up generating a
I'll take a look at this and come up with a good way to unit test For example, I extracted the log forwarding stuff into its own module which roughly exposes an interface for how to use it (with the rough edges getting refined over time, as the things it needs to do becomes more clear), so that's all you should have to think about when you want to forward logs -- this should probably eventually be moved to its own crate, especially if anything else wants to use it as well later on. But this doesn't sound to me like the same kind of separation you're asking for. As far as your meta-comments, like I mentioned over Slack, I certainly have been opening PRs along the way, though granted I could have been doing even more of that. And as far as evergreen commits, that is indeed always my goal, and with this round of changes I've done some rewriting and condensing of the commits to make them more self-contained. I do have a question about that though: when you're at the point of pushing your work up to a PR, do you actually just squash everything into one set of changes, and then chunk it out from there into separate "mini-PR" commits? Because I've been trying to work with the evergreen commits the whole time, and if I need to make more-than-trivial changes, I find myself spending more time than I'd like on figuring out exactly which chunks of changes to send to which commit. Thoughts? |
Description:
This adds support for the server side of Dekaf's support for materialization endpoints. At a high level, Dekaf is just another way to get data out of Flow. We already have a well fleshed out concept for these things: materializations. So back in #1665 we introduced support for a new materialization "endpoint type":
dekaf
. This lives alongsidelocal
andconnector
as the third kind of materialization, and is configured like so:The second part of this work is for Dekaf the server to support this mode of operation. Briefly, it needs to:
I still have a couple of things on my list before this is fully wrapped up:
SessionAuthentication::Task
sessions to a new/different backing store for migration purposesThis change is