Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dekaf runtime changes #1854

Merged
merged 5 commits into from
Jan 6, 2025
Merged

Dekaf runtime changes #1854

merged 5 commits into from
Jan 6, 2025

Conversation

jshearer
Copy link
Contributor

@jshearer jshearer commented Jan 3, 2025

Description:

Rolls up a couple more changes to the agent and runtime for Dekaf:

  • Detect tasks intended for Dekaf inside Runtime::unary_materialize() and route them to dekaf::connector::unary_materialize(). Doing it here instead of in build allows more of Flow to interact with Dekaf tasks correctly:
    • The field selection UI uses a dry_run publication (i.e a validate call) to fetch which fields to show. This now intercepts that call and correctly passes it to Dekaf because we set connector_type based on the endpoint config in walk_materialization():
      models::MaterializationEndpoint::Dekaf(config) => (
      flow::materialization_spec::ConnectorType::Dekaf as i32,
      serde_json::to_string(config).unwrap(),
    • The connector_tags agent task calls a connector's Spec RPC, and we need to get Dekaf's endpoint/resource config schemas somehow so I figured rather than writing a thing to do it manually, all we need to do is send connector_type: ConnectorType::Dekaf when we detect a task with image_name starting in ghcr.io/estuary/dekaf-.
  • I also had to skip image pulling in connector_tags when processing a Dekaf row as obviously there's no image to pull.
  • Move unseal out of runtime into its own crate. We now need runtime to depend on dekaf, and they both need unseal so it needed to be moved somewhere, and there didn't seem to be anywhere good to put it other than its own crate.

This change is Reviewable

We need `runtime` to depend on `dekaf`, and they both need `unseal`, so it needed to be moved somewhere, and there didn't seem to be anywhere good to put it other than its own crate.
In order for field selection to work in the UI, it needs to be able to validate a draft with a Dekaf connector in it. This detects when the request is for a Dekaf materialization and re-routes it to Dekaf's `unary_materialize`. It also means that the `connector_tags` job can work for Dekaf with a small change to detect image names starting in `ghcr.io/estuary/dekaf-*` and mark them as `ConnectorType::Dekaf`.
Copy link
Member

@psFried psFried left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Err(err) => {
tracing::warn!(image = %image_composed, error = %err, "failed to determine connector protocol");
return Ok((row.tag_id, JobStatus::SpecFailed));
let proto_type = if row.image_name.starts_with(DEKAF_IMAGE_NAME_PREFIX) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kinda feel like this conditional should be in runtime::flow_runtime_protocol. Is there a reason to prefer it to be in agent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, you're right. I read runtime::flow_runtime_protocol and parse_image_inspection briefly and saw that docker inspect call, but it's trivial to check the image first. 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants