-
Notifications
You must be signed in to change notification settings - Fork 62
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dekaf runtime changes #1854
Dekaf runtime changes #1854
Conversation
We need `runtime` to depend on `dekaf`, and they both need `unseal`, so it needed to be moved somewhere, and there didn't seem to be anywhere good to put it other than its own crate.
In order for field selection to work in the UI, it needs to be able to validate a draft with a Dekaf connector in it. This detects when the request is for a Dekaf materialization and re-routes it to Dekaf's `unary_materialize`. It also means that the `connector_tags` job can work for Dekaf with a small change to detect image names starting in `ghcr.io/estuary/dekaf-*` and mark them as `ConnectorType::Dekaf`.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
crates/agent/src/connector_tags.rs
Outdated
Err(err) => { | ||
tracing::warn!(image = %image_composed, error = %err, "failed to determine connector protocol"); | ||
return Ok((row.tag_id, JobStatus::SpecFailed)); | ||
let proto_type = if row.image_name.starts_with(DEKAF_IMAGE_NAME_PREFIX) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I kinda feel like this conditional should be in runtime::flow_runtime_protocol
. Is there a reason to prefer it to be in agent
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, you're right. I read runtime::flow_runtime_protocol
and parse_image_inspection
briefly and saw that docker inspect
call, but it's trivial to check the image first. 👍
Description:
Rolls up a couple more changes to the agent and runtime for Dekaf:
Runtime::unary_materialize()
and route them todekaf::connector::unary_materialize()
. Doing it here instead of inbuild
allows more of Flow to interact with Dekaf tasks correctly:dry_run
publication (i.e avalidate
call) to fetch which fields to show. This now intercepts that call and correctly passes it to Dekaf because we setconnector_type
based on the endpoint config inwalk_materialization()
:flow/crates/validation/src/materialization.rs
Lines 126 to 128 in 44dc0db
connector_tags
agent task calls a connector'sSpec
RPC, and we need to get Dekaf's endpoint/resource config schemas somehow so I figured rather than writing a thing to do it manually, all we need to do is sendconnector_type: ConnectorType::Dekaf
when we detect a task withimage_name
starting inghcr.io/estuary/dekaf-
.connector_tags
when processing a Dekaf row as obviously there's no image to pull.runtime
to depend ondekaf
, and they both needunseal
so it needed to be moved somewhere, and there didn't seem to be anywhere good to put it other than its own crate.This change is