Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dekaf materialization endpoint support #1840

Open
wants to merge 10 commits into
base: master
Choose a base branch
from

Conversation

jshearer
Copy link
Contributor

@jshearer jshearer commented Dec 18, 2024

Description:

This adds support for the server side of Dekaf's support for materialization endpoints. At a high level, Dekaf is just another way to get data out of Flow. We already have a well fleshed out concept for these things: materializations. So back in #1665 we introduced support for a new materialization "endpoint type": dekaf. This lives alongside local and connector as the third kind of materialization, and is configured like so:

    endpoint:
      dekaf:
        variant: some-dekaf-variant
        config:
          token: "foo"
          ...other configuration for the behavior of your Dekaf task... 

The second part of this work is for Dekaf the server to support this mode of operation. Briefly, it needs to:

  • Support authentication and authorization using the control plane. This means that conceptually, Dekaf is authenticating you based on the task name and token specified in the endpoint config, and authorizing you based on the access granted to that task
    • Specifically, this means that you now use the task name for your username, and the token specified in the endpoint config when connecting to Dekaf.
  • Act more like a regular materialization. This means...
    • Support for field selection. Materializations let you specify which field(s) you want included in your destination using field selection. This fundamentally looks like a document transformation, where some fields may be removed and some fields may be projected from their original home to a new location in the output.
    • Support for task logs. Just like captures, materialization log output is captured and presented to users for status monitoring and debugging. As Dekaf is multi-tenant at its core, presenting these logs requires identifying which log messages are associated with which task, and then capturing and writing them to the corresponding logs journal.
    • [Fast-Follow] Support for task stats. In order to monitor the status of your running tasks, as well as aggregate usage information for billing, all tasks need to periodically emit information about how much work they've been doing. While Dekaf was in beta, all usage was free so this was less of a priority, but now that there's a task to associate stats with, implementing stats will likely be one of the last things to do before going GA.

I still have a couple of things on my list before this is fully wrapped up:

  • Implement emitting stats
  • Implement CI task for Dekaf integration tests
  • Implement routing SessionAuthentication::Task sessions to a new/different backing store for migration purposes
  • Figure out how to make shard statuses show green in the UI for Dekaf tasks

This change is Reviewable

@jshearer jshearer force-pushed the jshearer/dekaf_materialization_endpoint_support branch 4 times, most recently from 173b9a2 to 2528553 Compare January 6, 2025 19:14
@jshearer jshearer force-pushed the jshearer/dekaf_materialization_endpoint_support branch 7 times, most recently from 41b05c2 to 457cb62 Compare January 13, 2025 14:50
@jshearer jshearer force-pushed the jshearer/dekaf_materialization_endpoint_support branch 14 times, most recently from cd55a23 to a16d4c5 Compare January 15, 2025 21:53
@jshearer jshearer force-pushed the jshearer/dekaf_materialization_endpoint_support branch from a432255 to ff0ba1d Compare January 16, 2025 03:15
@jshearer jshearer force-pushed the jshearer/dekaf_materialization_endpoint_support branch 2 times, most recently from e1c4157 to 4a38630 Compare January 16, 2025 03:26
@jshearer jshearer marked this pull request as ready for review January 16, 2025 23:15
@jshearer
Copy link
Contributor Author

Alright, I could probably do another few rounds of self-review here, but I think it's at a good enough spot to mark this as ready for review. Let me know what y'all think!

Copy link
Member

@jgraettinger jgraettinger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sending comments from a partial review. I haven't finished.

crates/dekaf/src/connector.rs Outdated Show resolved Hide resolved
crates/dekaf/src/connector.rs Outdated Show resolved Hide resolved
crates/dekaf/src/connector.rs Outdated Show resolved Hide resolved
crates/dekaf/src/lib.rs Outdated Show resolved Hide resolved
crates/dekaf/src/log_journal.rs Outdated Show resolved Hide resolved
crates/dekaf/src/main.rs Outdated Show resolved Hide resolved
crates/dekaf/src/main.rs Outdated Show resolved Hide resolved
crates/dekaf/src/read.rs Outdated Show resolved Hide resolved
crates/dekaf/src/read.rs Outdated Show resolved Hide resolved
crates/dekaf/src/read.rs Outdated Show resolved Hide resolved
Copy link
Member

@jgraettinger jgraettinger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I've finished a pass.

As a meta comment, for big features like this please either:
a) Open multiple smaller PRs which can be reviewed independently, or

b) Maintain ever-green commits which are re-written for the reader: they should tell more bite-size stories about changes being made, stand on their own, and be feasible to review (or re-review) independently. This is the workflow @psFried and I use. In essence each commit is a mini-PR -- it requires that you regularly rebase/squash/fixup as you develop.

crates/dekaf/src/topology.rs Outdated Show resolved Hide resolved
crates/dekaf/src/topology.rs Outdated Show resolved Hide resolved
// Create value shape by merging all projected fields in the schema
let (field_selected_shape, projections) = if let Some(binding) = binding {
let selection = binding
.field_selection
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a general comment, I'd like to see this kind of extraction and swizzling pulled out into pure functions over input data structures, returning a new output data structure (or error, if validation fails). I'll point to the various Task structs in the runtime crate as an example of this.

This makes them test-able without postgrest in the loop. Recently, it became possible to build catalog specs in Rust as part of tests, which would let you exercise your logic with "real" tasks. (An example: https://github.com/estuary/flow/blob/master/crates/activate/src/lib.rs#L858-L875)

Big picture, the goal is to roughly separate routines into "glue" (connecting services, moving data, calling things) which are easy to inspect because they obviously succeed or obviously fail, and "business logic" (extract / validate / swizzle / build), which are easy to test if they're pure functions.

crates/dekaf/src/utils.rs Outdated Show resolved Hide resolved
crates/dekaf/src/utils.rs Outdated Show resolved Hide resolved
@jshearer jshearer force-pushed the jshearer/dekaf_materialization_endpoint_support branch 2 times, most recently from 6f12370 to 692aeb2 Compare January 29, 2025 23:41
…s using `/authorize/dekaf` and `/authorize/task`

Also add a hint for passing a collection name as a topic name, when the binding has renamed that topic
* Connector projections: emit recommended constraints for all fields -- you get everything by default, and you can modify the selection however you like
* Schema: Build a schema from the materialization's built spec's `field_selection` and the collection's projections that will match the extracted documents
* Extraction: Implement field extraction using the `extractors` crate to emit documents that match the "value schema"
@jshearer jshearer force-pushed the jshearer/dekaf_materialization_endpoint_support branch 3 times, most recently from ec0b63f to 94edba4 Compare January 30, 2025 17:01
@jshearer jshearer force-pushed the jshearer/dekaf_materialization_endpoint_support branch from 94edba4 to debb6ca Compare January 30, 2025 18:08
@jshearer
Copy link
Contributor Author

jshearer commented Jan 30, 2025

Okay, I think this is ready for another look. I changed around the field selection to be closer to your suggested pattern of building up a list of extractors and their associated avro schemas. The realization that avro documents can be encoded sequentially allowed me to avoid building up the full HeapNode before encoding, which should reduce memory usage a good bit.

FWIW though, I ended up generating a (avro::Schema, Vec<doc::Extractor>) in build_field_extractors(), mainly because I need the full schema in Collection in order to save it to the database and get a content-addressed ID. I then extract out the field schemas and zip them with the extractors in Read. I suppose I could instead avoid building the full schema in build_field_extractors and just build it when dealing with schema registry stuff... this feels like six of one, half dozen of the other IMO.

Recently, it became possible to build catalog specs in Rust as part of tests, which would let you exercise your logic with "real" tasks.

I'll take a look at this and come up with a good way to unit test build_field_extractors(). As it stands now, are you still wanting more separation between "glue" and "business logic"? If so, could you try to articulate where to draw that imaginary line? Usually when I think about this sort of thing I've been thinking about what logic can be written in a more reusable/more broadly applicable way (i.e what belongs in an "externally" exposed interface vs what should remain internal, etc), but it sounds like you're talking more about distinguishing between things that can fail vs things that can't?

For example, I extracted the log forwarding stuff into its own module which roughly exposes an interface for how to use it (with the rough edges getting refined over time, as the things it needs to do becomes more clear), so that's all you should have to think about when you want to forward logs -- this should probably eventually be moved to its own crate, especially if anything else wants to use it as well later on. But this doesn't sound to me like the same kind of separation you're asking for.

As far as your meta-comments, like I mentioned over Slack, I certainly have been opening PRs along the way, though granted I could have been doing even more of that.

And as far as evergreen commits, that is indeed always my goal, and with this round of changes I've done some rewriting and condensing of the commits to make them more self-contained. I do have a question about that though: when you're at the point of pushing your work up to a PR, do you actually just squash everything into one set of changes, and then chunk it out from there into separate "mini-PR" commits? Because I've been trying to work with the evergreen commits the whole time, and if I need to make more-than-trivial changes, I find myself spending more time than I'd like on figuring out exactly which chunks of changes to send to which commit. Thoughts?

@jshearer jshearer requested a review from jgraettinger January 30, 2025 19:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants