Skip to content

Commit

Permalink
feat(api): stream user pipeline trigger response (#318)
Browse files Browse the repository at this point in the history
feat(api): stream user pipeline trigger response

Because:
- We want to support streaming responses for triggering user pipelines,
which is useful for real-time inference when low latency is a concern.
- The current RPC method for triggering user pipelines returns the
response synchronously, which is not suitable for streaming.
  • Loading branch information
tillknuesting authored May 21, 2024
1 parent e1f183d commit bf709f3
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 0 deletions.
29 changes: 29 additions & 0 deletions vdp/pipeline/v1beta/pipeline.proto
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,35 @@ message TriggerUserPipelineResponse {
TriggerMetadata metadata = 2;
}

// TriggerUserPipelineWithStreamRequest represents a request to trigger a user-owned
// pipeline synchronously and streams back the results.
message TriggerUserPipelineWithStreamRequest {
// The resource name of the pipeline, which allows its access by parent user
// and ID.
// - Format: `users/{user.id}/pipelines/{pipeline.id}`.
string name = 1 [
(google.api.field_behavior) = REQUIRED,
(google.api.resource_reference) = {type: "api.instill.tech/Pipeline"},
(grpc.gateway.protoc_gen_openapiv2.options.openapiv2_field) = {
field_configuration: {path_param_name: "user_pipeline_name"}
}
];
// Pipeline input parameters.
repeated google.protobuf.Struct inputs = 2 [(google.api.field_behavior) = REQUIRED];
// Pipeline secrets parameters that will override the pipeline's or owner's secrets.
map<string, string> secrets = 3 [(google.api.field_behavior) = OPTIONAL];
}

// TriggerUserPipelineWithStreamResponse contains the pipeline execution results, i.e.,
// the multiple model inference outputs.
message TriggerUserPipelineWithStreamResponse {
// Model inference outputs.
repeated google.protobuf.Struct outputs = 1;
// Traces of the pipeline inference.
TriggerMetadata metadata = 2;
}


// TriggerUserPipelineRequest represents a request to trigger a user-owned
// pipeline synchronously.
message TriggerAsyncUserPipelineRequest {
Expand Down
19 changes: 19 additions & 0 deletions vdp/pipeline/v1beta/pipeline_public_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,25 @@ service PipelinePublicService {
option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = {tags: "Trigger"};
}



// Trigger a pipeline owned by a user and stream back the response
//
// Triggers the execution of a pipeline asynchronously and streams back the response.
// This method is intended for real-time inference when low latency is of concern
// and the response needs to be processed incrementally.
//
// The pipeline is identified by its resource name, formed by the parent user
// and ID of the pipeline.
rpc TriggerUserPipelineWithStream(TriggerUserPipelineWithStreamRequest) returns ( stream TriggerUserPipelineWithStreamResponse) {
option (google.api.http) = {
post: "/v1beta/{name=users/*/pipelines/*}/trigger:stream"
body: "*"
};
option (google.api.method_signature) = "name,inputs";
option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = {tags: "Trigger"};
}

// Trigger a pipeline owned by a user asynchronously
//
// Triggers the execution of a pipeline asynchronously, i.e., the result
Expand Down

0 comments on commit bf709f3

Please sign in to comment.