diff --git a/integration-test/const.js b/integration-test/const.js index 5e23c4b34..403b8462a 100644 --- a/integration-test/const.js +++ b/integration-test/const.js @@ -28,10 +28,23 @@ export const detSyncMultiModelInstRecipe = { export const dstCSVConnID = "some-cool-name-for-dst-csv-connector" -export const detAsyncRecipe = { +export const detAsyncSingleModelInstRecipe = { recipe: { source: "source-connectors/source-http", - model_instances: [`models/${model_id}/instances/${model_instance_id}`], + model_instances: [ + `models/${model_id}/instances/${model_instance_id}` + ], + destination:`destination-connectors/${dstCSVConnID}` + }, +}; + +export const detAsyncMultiModelInstRecipe = { + recipe: { + source: "source-connectors/source-http", + model_instances: [ + `models/${model_id}/instances/${model_instance_id}`, + `models/${model_id}/instances/${model_instance_id}`, + ], destination:`destination-connectors/${dstCSVConnID}` }, }; diff --git a/integration-test/rest-pipeline.js b/integration-test/rest-pipeline.js index 7a6c08427..1f08ad9ca 100644 --- a/integration-test/rest-pipeline.js +++ b/integration-test/rest-pipeline.js @@ -250,9 +250,9 @@ export function CheckList() { [`GET /v1alpha/pipelines?filter=mode=MODE_SYNC%20AND%20recipe.source=%22${srcConnPermalink}%22 response pipelines.length > 0`]: (r) => r.json().pipelines.length > 0, }); - check(http.request("GET", `${pipelineHost}/v1alpha/pipelines?filter=mode=MODE_SYNC%20AND%20recipe.source=%22${srcConnPermalink}%22%20AND%20recipe.model_instances:%22${modelInstPermalink}%22`, null, {headers: {"Content-Type": "application/json",}}), { - [`GET /v1alpha/pipelines?filter=mode=MODE_SYNC%20AND%20recipe.source=%22${srcConnPermalink}%22%20AND%20recipe.model_instances:%22${modelInstPermalink}%22 response 200`]: (r) => r.status == 200, - [`GET /v1alpha/pipelines?filter=mode=MODE_SYNC%20AND%20recipe.source=%22${srcConnPermalink}%22%20AND%20recipe.model_instances:%22${modelInstPermalink}%22 response pipelines.length > 0`]: (r) => r.json().pipelines.length > 0, + check(http.request("GET", `${pipelineHost}/v1alpha/pipelines?filter=mode=MODE_SYNC%20AND%20recipe.destination=%22${dstConnPermalink}%22%20AND%20recipe.model_instances:%22${modelInstPermalink}%22`, null, {headers: {"Content-Type": "application/json",}}), { + [`GET /v1alpha/pipelines?filter=mode=MODE_SYNC%20AND%20recipe.source=%22${dstConnPermalink}%22%20AND%20recipe.model_instances:%22${modelInstPermalink}%22 response 200`]: (r) => r.status == 200, + [`GET /v1alpha/pipelines?filter=mode=MODE_SYNC%20AND%20recipe.source=%22${dstConnPermalink}%22%20AND%20recipe.model_instances:%22${modelInstPermalink}%22 response pipelines.length > 0`]: (r) => r.json().pipelines.length > 0, }); // Delete the pipelines @@ -474,7 +474,7 @@ export function CheckUpdateState() { { id: randomString(10), }, - constant.detAsyncRecipe + constant.detAsyncSingleModelInstRecipe ) check(http.request("POST", `${pipelineHost}/v1alpha/pipelines`, JSON.stringify(reqBodyAsync), { @@ -483,7 +483,7 @@ export function CheckUpdateState() { }, }), { "POST /v1alpha/pipelines async pipeline creation response status is 201": (r) => r.status === 201, - "POST /v1alpha/pipelines async pipeline creation response pipeline state ACTIVE": (r) => r.json().pipeline.state === "STATE_INACTIVE", + "POST /v1alpha/pipelines async pipeline creation response pipeline state ACTIVE": (r) => r.json().pipeline.state === "STATE_ACTIVE", }); check(http.request("POST", `${pipelineHost}/v1alpha/pipelines/${reqBodyAsync.id}:activate`, null, { diff --git a/integration-test/rest-trigger-async.js b/integration-test/rest-trigger-async.js new file mode 100644 index 000000000..2ea2ebe79 --- /dev/null +++ b/integration-test/rest-trigger-async.js @@ -0,0 +1,256 @@ +import http from "k6/http"; +import encoding from "k6/encoding"; + +import { FormData } from "https://jslib.k6.io/formdata/0.0.2/index.js"; +import { check, group } from "k6"; +import { randomString } from "https://jslib.k6.io/k6-utils/1.1.0/index.js"; + +import * as constant from "./const.js" + +export function CheckTriggerAsyncSingleImageSingleModelInst() { + + var reqBody = Object.assign( + { + id: randomString(10), + description: randomString(50), + }, + constant.detAsyncSingleModelInstRecipe + ); + + group("Pipelines API: Trigger an async pipeline for single image and single model instance", () => { + + check(http.request("POST", `${pipelineHost}/v1alpha/pipelines`, JSON.stringify(reqBody), { + headers: { + "Content-Type": "application/json", + }, + }), { + "POST /v1alpha/pipelines response status is 201": (r) => r.status === 201, + }); + + var payloadImageURL = { + inputs: [ + { + image_url: "https://artifacts.instill.tech/dog.jpg", + } + ] + }; + + check(http.request("POST", `${pipelineHost}/v1alpha/pipelines/${reqBody.id}:trigger`, JSON.stringify(payloadImageURL), { + headers: { + "Content-Type": "application/json", + }, + }), { + [`POST /v1alpha/pipelines/${reqBody.id}:trigger (url) response status is 200`]: (r) => r.status === 200, + }); + + var payloadImageBase64 = { + inputs: [ + { + imageBase64: encoding.b64encode(constant.dogImg, "b"), + } + ] + }; + + check(http.request("POST", `${pipelineHost}/v1alpha/pipelines/${reqBody.id}:trigger`, JSON.stringify(payloadImageBase64), { + headers: { + "Content-Type": "application/json", + }, + }), { + [`POST /v1alpha/pipelines/${reqBody.id}:trigger (base64) response status is 200`]: (r) => r.status === 200, + }); + + const fd = new FormData(); + fd.append("file", http.file(constant.dogImg)); + check(http.request("POST", `${pipelineHost}/v1alpha/pipelines/${reqBody.id}:trigger-multipart`, fd.body(), { + headers: { + "Content-Type": `multipart/form-data; boundary=${fd.boundary}`, + }, + }), { + [`POST /v1alpha/pipelines/${reqBody.id}:trigger (multipart) response status is 200`]: (r) => r.status === 200, + }); + + }); + + // Delete the pipeline + check(http.request("DELETE", `${pipelineHost}/v1alpha/pipelines/${reqBody.id}`, null, { + headers: { + "Content-Type": "application/json", + }, + }), { + [`DELETE /v1alpha/pipelines/${reqBody.id} response status 204`]: (r) => r.status === 204, + }); +} + +export function CheckTriggerAsyncMultiImageSingleModelInst() { + var reqBody = Object.assign( + { + id: randomString(10), + description: randomString(50), + }, + constant.detAsyncSingleModelInstRecipe + ); + + group("Pipelines API: Trigger an async pipeline for multiple images and single model instance", () => { + + check(http.request("POST", `${pipelineHost}/v1alpha/pipelines`, JSON.stringify(reqBody), { + headers: { + "Content-Type": "application/json", + }, + }), { + "POST /v1alpha/pipelines response status is 201": (r) => r.status === 201, + }); + + var payloadImageURL = { + inputs: [ + { + image_url: "https://artifacts.instill.tech/dog.jpg", + }, + { + image_url: "https://artifacts.instill.tech/dog.jpg", + }, + { + image_url: "https://artifacts.instill.tech/dog.jpg", + } + ] + }; + + check(http.request("POST", `${pipelineHost}/v1alpha/pipelines/${reqBody.id}:trigger`, JSON.stringify(payloadImageURL), { + headers: { + "Content-Type": "application/json", + }, + }), { + [`POST /v1alpha/pipelines/${reqBody.id}:trigger (url) response status is 200`]: (r) => r.status === 200, + }); + + var payloadImageBase64 = { + inputs: [ + { + imageBase64: encoding.b64encode(constant.dogImg, "b"), + }, + { + imageBase64: encoding.b64encode(constant.dogImg, "b"), + }, + { + imageBase64: encoding.b64encode(constant.dogImg, "b"), + } + ] + }; + + check(http.request("POST", `${pipelineHost}/v1alpha/pipelines/${reqBody.id}:trigger`, JSON.stringify(payloadImageBase64), { + headers: { + "Content-Type": "application/json", + }, + }), { + [`POST /v1alpha/pipelines/${reqBody.id}:trigger (base64) response status is 200`]: (r) => r.status === 200, + }); + + const fd = new FormData(); + fd.append("file", http.file(constant.dogImg)); + fd.append("file", http.file(constant.dogImg)); + fd.append("file", http.file(constant.dogImg)); + check(http.request("POST", `${pipelineHost}/v1alpha/pipelines/${reqBody.id}:trigger-multipart`, fd.body(), { + headers: { + "Content-Type": `multipart/form-data; boundary=${fd.boundary}`, + }, + }), { + [`POST /v1alpha/pipelines/${reqBody.id}:trigger (multipart) response status is 200`]: (r) => r.status === 200, + }); + + }); + + // Delete the pipeline + check(http.request("DELETE", `${pipelineHost}/v1alpha/pipelines/${reqBody.id}`, null, { + headers: { + "Content-Type": "application/json", + }, + }), { + [`DELETE /v1alpha/pipelines/${reqBody.id} response status 204`]: (r) => r.status === 204, + }); +} + +export function CheckTriggerAsyncMultiImageMultiModelInst() { + var reqBody = Object.assign( + { + id: randomString(10), + description: randomString(50), + }, + constant.detAsyncMultiModelInstRecipe + ); + + group("Pipelines API: Trigger an async pipeline for multiple images and multiple model instances", () => { + + check(http.request("POST", `${pipelineHost}/v1alpha/pipelines`, JSON.stringify(reqBody), { + headers: { + "Content-Type": "application/json", + }, + }), { + "POST /v1alpha/pipelines response status is 201": (r) => r.status === 201, + }); + + var payloadImageURL = { + inputs: [ + { + image_url: "https://artifacts.instill.tech/dog.jpg", + }, + { + image_url: "https://artifacts.instill.tech/dog.jpg", + }, + { + image_url: "https://artifacts.instill.tech/dog.jpg", + } + ] + }; + + check(http.request("POST", `${pipelineHost}/v1alpha/pipelines/${reqBody.id}:trigger`, JSON.stringify(payloadImageURL), { + headers: { + "Content-Type": "application/json", + }, + }), { + [`POST /v1alpha/pipelines/${reqBody.id}:trigger (url) response status is 200`]: (r) => r.status === 200, + }); + + var payloadImageBase64 = { + inputs: [ + { + imageBase64: encoding.b64encode(constant.dogImg, "b"), + }, + { + imageBase64: encoding.b64encode(constant.dogImg, "b"), + }, + { + imageBase64: encoding.b64encode(constant.dogImg, "b"), + } + ] + }; + + check(http.request("POST", `${pipelineHost}/v1alpha/pipelines/${reqBody.id}:trigger`, JSON.stringify(payloadImageBase64), { + headers: { + "Content-Type": "application/json", + }, + }), { + [`POST /v1alpha/pipelines/${reqBody.id}:trigger (base64) response status is 200`]: (r) => r.status === 200, + }); + + const fd = new FormData(); + fd.append("file", http.file(constant.dogImg)); + fd.append("file", http.file(constant.dogImg)); + fd.append("file", http.file(constant.dogImg)); + check(http.request("POST", `${pipelineHost}/v1alpha/pipelines/${reqBody.id}:trigger-multipart`, fd.body(), { + headers: { + "Content-Type": `multipart/form-data; boundary=${fd.boundary}`, + }, + }), { + [`POST /v1alpha/pipelines/${reqBody.id}:trigger (multipart) response status is 200`]: (r) => r.status === 200, + }); + + }); + + // Delete the pipeline + check(http.request("DELETE", `${pipelineHost}/v1alpha/pipelines/${reqBody.id}`, null, { + headers: { + "Content-Type": "application/json", + }, + }), { + [`DELETE /v1alpha/pipelines/${reqBody.id} response status 204`]: (r) => r.status === 204, + }); +} diff --git a/integration-test/rest-trigger.js b/integration-test/rest-trigger-sync.js similarity index 72% rename from integration-test/rest-trigger.js rename to integration-test/rest-trigger-sync.js index 528fe24b9..c1f09b850 100644 --- a/integration-test/rest-trigger.js +++ b/integration-test/rest-trigger-sync.js @@ -7,7 +7,7 @@ import { randomString } from "https://jslib.k6.io/k6-utils/1.1.0/index.js"; import * as constant from "./const.js" -export function CheckTriggerDirectSingleImageSingleModelInst() { +export function CheckTriggerSyncSingleImageSingleModelInst() { var reqBody = Object.assign( { @@ -41,12 +41,12 @@ export function CheckTriggerDirectSingleImageSingleModelInst() { "Content-Type": "application/json", }, }), { - [`POST /v1alpha/pipelines/${reqBody.id}/outputs (url) response status is 200`]: (r) => r.status === 200, - [`POST /v1alpha/pipelines/${reqBody.id}/outputs (url) response output[0].detection_outputs.length`]: (r) => r.json().output[0].detection_outputs.length === payloadImageURL.inputs.length, - [`POST /v1alpha/pipelines/${reqBody.id}/outputs (url) response output[0].detection_outputs[0].bounding_box_objects.length`]: (r) => r.json().output[0].detection_outputs[0].bounding_box_objects.length === 1, - [`POST /v1alpha/pipelines/${reqBody.id}/outputs (url) response output[0].detection_outputs[0].bounding_box_objects[0].category`]: (r) => r.json().output[0].detection_outputs[0].bounding_box_objects[0].category === "test", - [`POST /v1alpha/pipelines/${reqBody.id}/outputs (url) response output[0].detection_outputs[0].bounding_box_objects[0].score`]: (r) => r.json().output[0].detection_outputs[0].bounding_box_objects[0].score === 1, - [`POST /v1alpha/pipelines/${reqBody.id}/outputs (url) response output[0].detection_outputs[0].bounding_box_objects[0].bounding_box`]: (r) => r.json().output[0].detection_outputs[0].bounding_box_objects[0].bounding_box !== undefined, + [`POST /v1alpha/pipelines/${reqBody.id}:trigger (url) response status is 200`]: (r) => r.status === 200, + [`POST /v1alpha/pipelines/${reqBody.id}:trigger (url) response output[0].detection_outputs.length`]: (r) => r.json().output[0].detection_outputs.length === payloadImageURL.inputs.length, + [`POST /v1alpha/pipelines/${reqBody.id}:trigger (url) response output[0].detection_outputs[0].bounding_box_objects.length`]: (r) => r.json().output[0].detection_outputs[0].bounding_box_objects.length === 1, + [`POST /v1alpha/pipelines/${reqBody.id}:trigger (url) response output[0].detection_outputs[0].bounding_box_objects[0].category`]: (r) => r.json().output[0].detection_outputs[0].bounding_box_objects[0].category === "test", + [`POST /v1alpha/pipelines/${reqBody.id}:trigger (url) response output[0].detection_outputs[0].bounding_box_objects[0].score`]: (r) => r.json().output[0].detection_outputs[0].bounding_box_objects[0].score === 1, + [`POST /v1alpha/pipelines/${reqBody.id}:trigger (url) response output[0].detection_outputs[0].bounding_box_objects[0].bounding_box`]: (r) => r.json().output[0].detection_outputs[0].bounding_box_objects[0].bounding_box !== undefined, }); var payloadImageBase64 = { @@ -62,27 +62,25 @@ export function CheckTriggerDirectSingleImageSingleModelInst() { "Content-Type": "application/json", }, }), { - [`POST /v1alpha/pipelines/${reqBody.id}/outputs (base64) response status is 200`]: (r) => r.status === 200, - [`POST /v1alpha/pipelines/${reqBody.id}/outputs (base64) response output[0].detection_outputs.length`]: (r) => r.json().output[0].detection_outputs.length === payloadImageBase64.inputs.length, - [`POST /v1alpha/pipelines/${reqBody.id}/outputs (base64) response output[0].detection_outputs[0].bounding_box_objects.length`]: (r) => r.json().output[0].detection_outputs[0].bounding_box_objects.length === 1, - [`POST /v1alpha/pipelines/${reqBody.id}/outputs (base64) response output[0].detection_outputs[0].bounding_box_objects[0].category`]: (r) => r.json().output[0].detection_outputs[0].bounding_box_objects[0].category === "test", - [`POST /v1alpha/pipelines/${reqBody.id}/outputs (base64) response output[0].detection_outputs[0].bounding_box_objects[0].score`]: (r) => r.json().output[0].detection_outputs[0].bounding_box_objects[0].score === 1, - [`POST /v1alpha/pipelines/${reqBody.id}/outputs (base64) response output[0].detection_outputs[0].bounding_box_objects[0].bounding_box`]: (r) => r.json().output[0].detection_outputs[0].bounding_box_objects[0].bounding_box !== undefined, + [`POST /v1alpha/pipelines/${reqBody.id}:trigger (base64) response status is 200`]: (r) => r.status === 200, + [`POST /v1alpha/pipelines/${reqBody.id}:trigger (base64) response output[0].detection_outputs.length`]: (r) => r.json().output[0].detection_outputs.length === payloadImageBase64.inputs.length, + [`POST /v1alpha/pipelines/${reqBody.id}:trigger (base64) response output[0].detection_outputs[0].bounding_box_objects.length`]: (r) => r.json().output[0].detection_outputs[0].bounding_box_objects.length === 1, + [`POST /v1alpha/pipelines/${reqBody.id}:trigger (base64) response output[0].detection_outputs[0].bounding_box_objects[0].category`]: (r) => r.json().output[0].detection_outputs[0].bounding_box_objects[0].category === "test", + [`POST /v1alpha/pipelines/${reqBody.id}:trigger (base64) response output[0].detection_outputs[0].bounding_box_objects[0].score`]: (r) => r.json().output[0].detection_outputs[0].bounding_box_objects[0].score === 1, + [`POST /v1alpha/pipelines/${reqBody.id}:trigger (base64) response output[0].detection_outputs[0].bounding_box_objects[0].bounding_box`]: (r) => r.json().output[0].detection_outputs[0].bounding_box_objects[0].bounding_box !== undefined, }); const fd = new FormData(); fd.append("file", http.file(constant.dogImg)); - fd.append("file", http.file(constant.dogImg)); - fd.append("file", http.file(constant.dogImg)); check(http.request("POST", `${pipelineHost}/v1alpha/pipelines/${reqBody.id}:trigger-multipart`, fd.body(), { headers: { "Content-Type": `multipart/form-data; boundary=${fd.boundary}`, }, }), { - [`POST /v1alpha/pipelines/${reqBody.id}/outputs (multipart) response status is 200`]: (r) => r.status === 200, - [`POST /v1alpha/pipelines/${reqBody.id}/outputs (multipart) response output[0].detection_outputs.length`]: (r) => r.json().output[0].detection_outputs.length === fd.parts.length, - [`POST /v1alpha/pipelines/${reqBody.id}/outputs (multipart) response output[0].detection_outputs[0].bounding_box_objects.length`]: (r) => r.json().output[0].detection_outputs[0].bounding_box_objects.length === 1, - [`POST /v1alpha/pipelines/${reqBody.id}/outputs (multipart) response output[0].detection_outputs[0].bounding_box_objects[0].score`]: (r) => r.json().output[0].detection_outputs[0].bounding_box_objects[0].score === 1, + [`POST /v1alpha/pipelines/${reqBody.id}:trigger-multipart response status is 200`]: (r) => r.status === 200, + [`POST /v1alpha/pipelines/${reqBody.id}:trigger-multipart response output[0].detection_outputs.length`]: (r) => r.json().output[0].detection_outputs.length === fd.parts.length, + [`POST /v1alpha/pipelines/${reqBody.id}:trigger-multipart response output[0].detection_outputs[0].bounding_box_objects.length`]: (r) => r.json().output[0].detection_outputs[0].bounding_box_objects.length === 1, + [`POST /v1alpha/pipelines/${reqBody.id}:trigger-multipart response output[0].detection_outputs[0].bounding_box_objects[0].score`]: (r) => r.json().output[0].detection_outputs[0].bounding_box_objects[0].score === 1, }); }); @@ -97,7 +95,7 @@ export function CheckTriggerDirectSingleImageSingleModelInst() { }); } -export function CheckTriggerDirectMultiImageSingleModelInst() { +export function CheckTriggerSyncMultiImageSingleModelInst() { var reqBody = Object.assign( { @@ -140,12 +138,12 @@ export function CheckTriggerDirectMultiImageSingleModelInst() { "Content-Type": "application/json", }, }), { - [`POST /v1alpha/pipelines/${reqBody.id}/outputs (url) response status is 200`]: (r) => r.status === 200, - [`POST /v1alpha/pipelines/${reqBody.id}/outputs (url) response output[0].detection_outputs.length`]: (r) => r.json().output[0].detection_outputs.length === payloadImageURL.inputs.length, - [`POST /v1alpha/pipelines/${reqBody.id}/outputs (url) response output[0].detection_outputs[0].bounding_box_objects.length`]: (r) => r.json().output[0].detection_outputs[0].bounding_box_objects.length === 1, - [`POST /v1alpha/pipelines/${reqBody.id}/outputs (url) response output[0].detection_outputs[0].bounding_box_objects[0].category`]: (r) => r.json().output[0].detection_outputs[0].bounding_box_objects[0].category === "test", - [`POST /v1alpha/pipelines/${reqBody.id}/outputs (url) response output[0].detection_outputs[0].bounding_box_objects[0].score`]: (r) => r.json().output[0].detection_outputs[0].bounding_box_objects[0].score === 1, - [`POST /v1alpha/pipelines/${reqBody.id}/outputs (url) response output[0].detection_outputs[0].bounding_box_objects[0].bounding_box`]: (r) => r.json().output[0].detection_outputs[0].bounding_box_objects[0].bounding_box !== undefined, + [`POST /v1alpha/pipelines/${reqBody.id}:trigger (url) response status is 200`]: (r) => r.status === 200, + [`POST /v1alpha/pipelines/${reqBody.id}:trigger (url) response output[0].detection_outputs.length`]: (r) => r.json().output[0].detection_outputs.length === payloadImageURL.inputs.length, + [`POST /v1alpha/pipelines/${reqBody.id}:trigger (url) response output[0].detection_outputs[0].bounding_box_objects.length`]: (r) => r.json().output[0].detection_outputs[0].bounding_box_objects.length === 1, + [`POST /v1alpha/pipelines/${reqBody.id}:trigger (url) response output[0].detection_outputs[0].bounding_box_objects[0].category`]: (r) => r.json().output[0].detection_outputs[0].bounding_box_objects[0].category === "test", + [`POST /v1alpha/pipelines/${reqBody.id}:trigger (url) response output[0].detection_outputs[0].bounding_box_objects[0].score`]: (r) => r.json().output[0].detection_outputs[0].bounding_box_objects[0].score === 1, + [`POST /v1alpha/pipelines/${reqBody.id}:trigger (url) response output[0].detection_outputs[0].bounding_box_objects[0].bounding_box`]: (r) => r.json().output[0].detection_outputs[0].bounding_box_objects[0].bounding_box !== undefined, }); var payloadImageBase64 = { @@ -164,12 +162,12 @@ export function CheckTriggerDirectMultiImageSingleModelInst() { "Content-Type": "application/json", }, }), { - [`POST /v1alpha/pipelines/${reqBody.id}/outputs (base64) response status is 200`]: (r) => r.status === 200, - [`POST /v1alpha/pipelines/${reqBody.id}/outputs (base64) response output[0].detection_outputs.length`]: (r) => r.json().output[0].detection_outputs.length === payloadImageBase64.inputs.length, - [`POST /v1alpha/pipelines/${reqBody.id}/outputs (base64) response output[0].detection_outputs[0].bounding_box_objects.length`]: (r) => r.json().output[0].detection_outputs[0].bounding_box_objects.length === 1, - [`POST /v1alpha/pipelines/${reqBody.id}/outputs (base64) response output[0].detection_outputs[0].bounding_box_objects[0].category`]: (r) => r.json().output[0].detection_outputs[0].bounding_box_objects[0].category === "test", - [`POST /v1alpha/pipelines/${reqBody.id}/outputs (base64) response output[0].detection_outputs[0].bounding_box_objects[0].score`]: (r) => r.json().output[0].detection_outputs[0].bounding_box_objects[0].score === 1, - [`POST /v1alpha/pipelines/${reqBody.id}/outputs (base64) response output[0].detection_outputs[0].bounding_box_objects[0].bounding_box`]: (r) => r.json().output[0].detection_outputs[0].bounding_box_objects[0].bounding_box !== undefined, + [`POST /v1alpha/pipelines/${reqBody.id}:trigger (base64) response status is 200`]: (r) => r.status === 200, + [`POST /v1alpha/pipelines/${reqBody.id}:trigger (base64) response output[0].detection_outputs.length`]: (r) => r.json().output[0].detection_outputs.length === payloadImageBase64.inputs.length, + [`POST /v1alpha/pipelines/${reqBody.id}:trigger (base64) response output[0].detection_outputs[0].bounding_box_objects.length`]: (r) => r.json().output[0].detection_outputs[0].bounding_box_objects.length === 1, + [`POST /v1alpha/pipelines/${reqBody.id}:trigger (base64) response output[0].detection_outputs[0].bounding_box_objects[0].category`]: (r) => r.json().output[0].detection_outputs[0].bounding_box_objects[0].category === "test", + [`POST /v1alpha/pipelines/${reqBody.id}:trigger (base64) response output[0].detection_outputs[0].bounding_box_objects[0].score`]: (r) => r.json().output[0].detection_outputs[0].bounding_box_objects[0].score === 1, + [`POST /v1alpha/pipelines/${reqBody.id}:trigger (base64) response output[0].detection_outputs[0].bounding_box_objects[0].bounding_box`]: (r) => r.json().output[0].detection_outputs[0].bounding_box_objects[0].bounding_box !== undefined, }); const fd = new FormData(); @@ -181,10 +179,10 @@ export function CheckTriggerDirectMultiImageSingleModelInst() { "Content-Type": `multipart/form-data; boundary=${fd.boundary}`, }, }), { - [`POST /v1alpha/pipelines/${reqBody.id}/outputs (multipart) response status is 200`]: (r) => r.status === 200, - [`POST /v1alpha/pipelines/${reqBody.id}/outputs (multipart) response output[0].detection_outputs.length`]: (r) => r.json().output[0].detection_outputs.length === fd.parts.length, - [`POST /v1alpha/pipelines/${reqBody.id}/outputs (multipart) response output[0].detection_outputs[0].bounding_box_objects.length`]: (r) => r.json().output[0].detection_outputs[0].bounding_box_objects.length === 1, - [`POST /v1alpha/pipelines/${reqBody.id}/outputs (multipart) response output[0].detection_outputs[0].bounding_box_objects[0].score`]: (r) => r.json().output[0].detection_outputs[0].bounding_box_objects[0].score === 1, + [`POST /v1alpha/pipelines/${reqBody.id}:trigger-multipart response status is 200`]: (r) => r.status === 200, + [`POST /v1alpha/pipelines/${reqBody.id}:trigger-multipart response output[0].detection_outputs.length`]: (r) => r.json().output[0].detection_outputs.length === fd.parts.length, + [`POST /v1alpha/pipelines/${reqBody.id}:trigger-multipart response output[0].detection_outputs[0].bounding_box_objects.length`]: (r) => r.json().output[0].detection_outputs[0].bounding_box_objects.length === 1, + [`POST /v1alpha/pipelines/${reqBody.id}:trigger-multipart response output[0].detection_outputs[0].bounding_box_objects[0].score`]: (r) => r.json().output[0].detection_outputs[0].bounding_box_objects[0].score === 1, }); }); @@ -199,7 +197,7 @@ export function CheckTriggerDirectMultiImageSingleModelInst() { }); } -export function CheckTriggerDirectMultiImageMultiModelInst() { +export function CheckTriggerSyncMultiImageMultiModelInst() { var reqBody = Object.assign( { @@ -242,8 +240,8 @@ export function CheckTriggerDirectMultiImageMultiModelInst() { "Content-Type": "application/json", }, }), { - [`POST /v1alpha/pipelines/${reqBody.id}/outputs (url) response status is 200`]: (r) => r.status === 200, - [`POST /v1alpha/pipelines/${reqBody.id}/outputs (url) response output.length == 2`]: (r) => r.json().output.length === 2, + [`POST /v1alpha/pipelines/${reqBody.id}:trigger (url) response status is 200`]: (r) => r.status === 200, + [`POST /v1alpha/pipelines/${reqBody.id}:trigger (url) response output.length == 2`]: (r) => r.json().output.length === 2, }); var payloadImageBase64 = { @@ -262,8 +260,8 @@ export function CheckTriggerDirectMultiImageMultiModelInst() { "Content-Type": "application/json", }, }), { - [`POST /v1alpha/pipelines/${reqBody.id}/outputs (base64) response status is 200`]: (r) => r.status === 200, - [`POST /v1alpha/pipelines/${reqBody.id}/outputs (base64) response output.length == 2`]: (r) => r.json().output.length === 2, + [`POST /v1alpha/pipelines/${reqBody.id}:trigger (base64) response status is 200`]: (r) => r.status === 200, + [`POST /v1alpha/pipelines/${reqBody.id}:trigger (base64) response output.length == 2`]: (r) => r.json().output.length === 2, }); const fd = new FormData(); @@ -275,8 +273,8 @@ export function CheckTriggerDirectMultiImageMultiModelInst() { "Content-Type": `multipart/form-data; boundary=${fd.boundary}`, }, }), { - [`POST /v1alpha/pipelines/${reqBody.id}/outputs (multipart) response status is 200`]: (r) => r.status === 200, - [`POST /v1alpha/pipelines/${reqBody.id}/outputs (multipart) response output.length == 2`]: (r) => r.json().output.length === 2, + [`POST /v1alpha/pipelines/${reqBody.id}:trigger-multipart (multipart) response status is 200`]: (r) => r.status === 200, + [`POST /v1alpha/pipelines/${reqBody.id}:trigger-multipart (multipart) response output.length == 2`]: (r) => r.json().output.length === 2, }); }); diff --git a/integration-test/rest.js b/integration-test/rest.js index 6a74ab8a1..391e5b667 100644 --- a/integration-test/rest.js +++ b/integration-test/rest.js @@ -6,7 +6,8 @@ import { randomString } from "https://jslib.k6.io/k6-utils/1.1.0/index.js"; import * as constant from "./const.js"; import * as pipeline from './rest-pipeline.js'; -import * as trigger from './rest-trigger.js'; +import * as triggerSync from './rest-trigger-sync.js'; +import * as triggerAsync from './rest-trigger-async.js'; const pipelineHost = "http://pipeline-backend:8081"; const connectorHost = "http://connector-backend:8082"; @@ -67,7 +68,7 @@ export function setup() { "destination_connector_definition": "destination-connector-definitions/destination-csv", "connector": { "configuration": JSON.stringify({ - "destination_path": "/local" + "destination_path": "/local/some-folder-in-airbyte-volume-local-path" }) } }), { @@ -78,6 +79,8 @@ export function setup() { "POST /v1alpha/destination-connectors response status for creating CSV destination connector 201": (r) => r.status === 201, }) + sleep(3) + }); group("Model Backend API: Deploy a detection model", function () { @@ -133,9 +136,14 @@ export default function (data) { pipeline.CheckRename() pipeline.CheckLookUp() - trigger.CheckTriggerDirectSingleImageSingleModelInst() - trigger.CheckTriggerDirectMultiImageSingleModelInst() - trigger.CheckTriggerDirectMultiImageMultiModelInst() + triggerSync.CheckTriggerSyncSingleImageSingleModelInst() + triggerSync.CheckTriggerSyncMultiImageSingleModelInst() + triggerSync.CheckTriggerSyncMultiImageMultiModelInst() + + triggerAsync.CheckTriggerAsyncSingleImageSingleModelInst() + triggerAsync.CheckTriggerAsyncMultiImageSingleModelInst() + triggerAsync.CheckTriggerAsyncMultiImageMultiModelInst() + } export function teardown(data) { diff --git a/internal/constant/constant.go b/internal/constant/constant.go index 6dcb8e1b3..a03f2d077 100644 --- a/internal/constant/constant.go +++ b/internal/constant/constant.go @@ -1,11 +1,5 @@ package constant -// ConnectionTypeDirectness is a slice records connector names having the connection-type directness -var ConnectionTypeDirectness = []string{ - "source-connectors/source-http", "source-connectors/source-grpc", - "destination-connectors/destination-http", "destination-connectors/destination-grpc", -} - const ( _ = iota KB = 1 << (10 * iota) diff --git a/internal/util/util.go b/internal/util/util.go deleted file mode 100644 index c85346fae..000000000 --- a/internal/util/util.go +++ /dev/null @@ -1,11 +0,0 @@ -package util - -// Contains checks if a string is present in a slice -func Contains(s []string, str string) bool { - for _, v := range s { - if v == str { - return true - } - } - return false -} diff --git a/pkg/handler/handler.go b/pkg/handler/handler.go index 335302606..cfcd9088c 100644 --- a/pkg/handler/handler.go +++ b/pkg/handler/handler.go @@ -419,10 +419,6 @@ func (h *handler) TriggerPipeline(ctx context.Context, req *pipelinePB.TriggerPi return &pipelinePB.TriggerPipelineResponse{}, err } - if err := h.service.ValidatePipeline(dbPipeline); err != nil { - return &pipelinePB.TriggerPipelineResponse{}, err - } - triggerModelResp, err := h.service.TriggerPipeline(req, dbPipeline) if err != nil { return &pipelinePB.TriggerPipelineResponse{}, err @@ -467,10 +463,6 @@ func (h *handler) TriggerPipelineBinaryFileUpload(stream pipelinePB.PipelineServ return err } - if err := h.service.ValidatePipeline(dbPipeline); err != nil { - return err - } - // Read chuck buf := bytes.Buffer{} for { diff --git a/pkg/service/convert.go b/pkg/service/convert.go index 72d394686..2f34e8128 100644 --- a/pkg/service/convert.go +++ b/pkg/service/convert.go @@ -34,62 +34,65 @@ func (s *service) ownerRscNameToPermalink(ownerRscName string) (ownerPermalink s return ownerPermalink, nil } -func (s *service) recipeNameToPermalink(recipe *datamodel.Recipe) error { +func (s *service) recipeNameToPermalink(recipeRscName *datamodel.Recipe) (*datamodel.Recipe, error) { - ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() + recipePermalink := datamodel.Recipe{} + // Source connector getSrcConnResp, err := s.connectorServiceClient.GetSourceConnector(ctx, &connectorPB.GetSourceConnectorRequest{ - Name: recipe.Source, + Name: recipeRscName.Source, }) if err != nil { - return fmt.Errorf("[connector-backend: GetSourceConnector - Name: %s] %s", recipe.Source, err) + return nil, fmt.Errorf("[connector-backend] Error %s at source-connectors/%s: %s", "GetSourceConnector", recipeRscName.Source, err) } - srcColID, err := resource.GetCollectionID(recipe.Source) + srcColID, err := resource.GetCollectionID(recipeRscName.Source) if err != nil { - return err + return nil, err } - recipe.Source = srcColID + "/" + getSrcConnResp.GetSourceConnector().GetUid() + recipePermalink.Source = srcColID + "/" + getSrcConnResp.GetSourceConnector().GetUid() // Destination connector getDstConnResp, err := s.connectorServiceClient.GetDestinationConnector(ctx, &connectorPB.GetDestinationConnectorRequest{ - Name: recipe.Destination, + Name: recipeRscName.Destination, }) if err != nil { - return fmt.Errorf("[connector-backend: GetDestinationConnector - Name: %s] %s", recipe.Destination, err) + return nil, fmt.Errorf("[connector-backend] Error %s at destination-connectors/%s: %s", "GetDestinationConnector", recipeRscName.Destination, err) } - dstColID, err := resource.GetCollectionID(recipe.Destination) + dstColID, err := resource.GetCollectionID(recipeRscName.Destination) if err != nil { - return err + return nil, err } - recipe.Destination = dstColID + "/" + getDstConnResp.GetDestinationConnector().GetUid() + recipePermalink.Destination = dstColID + "/" + getDstConnResp.GetDestinationConnector().GetUid() // Model instances - for idx, modelInstanceRscName := range recipe.ModelInstances { + recipePermalink.ModelInstances = make([]string, len(recipeRscName.ModelInstances)) + for idx, modelInstanceRscName := range recipeRscName.ModelInstances { getModelInstResp, err := s.modelServiceClient.GetModelInstance(ctx, &modelPB.GetModelInstanceRequest{ Name: modelInstanceRscName, }) if err != nil { - return fmt.Errorf("[model-backend: GetModelInstance - Name: %s] %s", modelInstanceRscName, err) + return nil, fmt.Errorf("[model-backend] Error %s at instances/%s: %s", "GetModelInstance", modelInstanceRscName, err) } modelInstColID, err := resource.GetCollectionID(modelInstanceRscName) if err != nil { - return err + return nil, err } modelInstID, err := resource.GetRscNameID(modelInstanceRscName) if err != nil { - return err + return nil, err } modelRscName := strings.TrimSuffix(modelInstanceRscName, "/"+modelInstColID+"/"+modelInstID) @@ -99,76 +102,79 @@ func (s *service) recipeNameToPermalink(recipe *datamodel.Recipe) error { Name: modelRscName, }) if err != nil { - return fmt.Errorf("[model-backend: GetModel - Name: %s] %s", modelRscName, err) + return nil, fmt.Errorf("[model-backend] Error %s at models/%s: %s", "GetModel", modelRscName, err) } modelColID, err := resource.GetCollectionID(modelRscName) if err != nil { - return err + return nil, err } - recipe.ModelInstances[idx] = modelColID + "/" + getModelResp.GetModel().GetUid() + "/" + modelInstColID + "/" + getModelInstResp.GetInstance().GetUid() + recipePermalink.ModelInstances[idx] = modelColID + "/" + getModelResp.GetModel().GetUid() + "/" + modelInstColID + "/" + getModelInstResp.GetInstance().GetUid() } - return nil + return &recipePermalink, nil } -func (s *service) recipePermalinkToName(recipe *datamodel.Recipe) error { +func (s *service) recipePermalinkToName(recipePermalink *datamodel.Recipe) (*datamodel.Recipe, error) { - ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() + recipeRscName := datamodel.Recipe{} + // Source connector lookUpSrcConnResp, err := s.connectorServiceClient.LookUpSourceConnector(ctx, &connectorPB.LookUpSourceConnectorRequest{ - Permalink: recipe.Source, + Permalink: recipePermalink.Source, }) if err != nil { - return fmt.Errorf("[connector-backend: LookUpSourceConnector - Permalink: %s] %s", recipe.Source, err) + return nil, fmt.Errorf("[connector-backend] Error %s at source-connectors/%s: %s", "LookUpSourceConnector", recipePermalink.Source, err) } - srcColID, err := resource.GetCollectionID(recipe.Source) + srcColID, err := resource.GetCollectionID(recipePermalink.Source) if err != nil { - return err + return nil, err } - recipe.Source = srcColID + "/" + lookUpSrcConnResp.GetSourceConnector().GetId() + recipeRscName.Source = srcColID + "/" + lookUpSrcConnResp.GetSourceConnector().GetId() // Destination connector lookUpDstConnResp, err := s.connectorServiceClient.LookUpDestinationConnector(ctx, &connectorPB.LookUpDestinationConnectorRequest{ - Permalink: recipe.Destination, + Permalink: recipePermalink.Destination, }) if err != nil { - return fmt.Errorf("[connector-backend: LookUpDestinationConnector - Permalink: %s] %s", recipe.Destination, err) + return nil, fmt.Errorf("[connector-backend] Error %s at destination-connectors/%s: %s", "LookUpDestinationConnector", recipePermalink.Destination, err) } - dstColID, err := resource.GetCollectionID(recipe.Destination) + dstColID, err := resource.GetCollectionID(recipePermalink.Destination) if err != nil { - return err + return nil, err } - recipe.Destination = dstColID + "/" + lookUpDstConnResp.GetDestinationConnector().GetId() + recipeRscName.Destination = dstColID + "/" + lookUpDstConnResp.GetDestinationConnector().GetId() // Model instances - for idx, modelInstanceRscPermalink := range recipe.ModelInstances { + recipeRscName.ModelInstances = make([]string, len(recipePermalink.ModelInstances)) + for idx, modelInstanceRscPermalink := range recipePermalink.ModelInstances { lookUpModelInstResp, err := s.modelServiceClient.LookUpModelInstance(ctx, &modelPB.LookUpModelInstanceRequest{ Permalink: modelInstanceRscPermalink, }) if err != nil { - return fmt.Errorf("[model-backend: LookUpModelInstance - Permalink: %s] %s", modelInstanceRscPermalink, err) + return nil, fmt.Errorf("[model-backend] Error %s at instances/%s: %s", "LookUpModelInstance", modelInstanceRscPermalink, err) } modelInstUID, err := resource.GetPermalinkUID(modelInstanceRscPermalink) if err != nil { - return err + return nil, err } modelInstColID, err := resource.GetCollectionID(modelInstanceRscPermalink) if err != nil { - return err + return nil, err } modelRscPermalink := strings.TrimSuffix(modelInstanceRscPermalink, "/"+modelInstColID+"/"+modelInstUID) @@ -177,16 +183,16 @@ func (s *service) recipePermalinkToName(recipe *datamodel.Recipe) error { Permalink: modelRscPermalink, }) if err != nil { - return fmt.Errorf("[model-backend: LookUpModel - Permalink: %s] %s", modelRscPermalink, err) + return nil, fmt.Errorf("[model-backend] Error %s at models/%s: %s", "LookUpModel", modelRscPermalink, err) } modelColID, err := resource.GetCollectionID(modelRscPermalink) if err != nil { - return err + return nil, err } - recipe.ModelInstances[idx] = modelColID + "/" + lookUpModelResp.Model.GetId() + "/" + modelInstColID + "/" + lookUpModelInstResp.GetInstance().GetId() + recipeRscName.ModelInstances[idx] = modelColID + "/" + lookUpModelResp.Model.GetId() + "/" + modelInstColID + "/" + lookUpModelInstResp.GetInstance().GetId() } - return nil + return &recipeRscName, nil } diff --git a/pkg/service/mode.go b/pkg/service/mode.go index cc32be64d..65811b730 100644 --- a/pkg/service/mode.go +++ b/pkg/service/mode.go @@ -13,31 +13,54 @@ import ( pipelinePB "github.com/instill-ai/protogen-go/vdp/pipeline/v1alpha" ) -func (s *service) getModeByConnRscName(srcConnRscName string, dstConnRscName string) (datamodel.PipelineMode, error) { - ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) +func (s *service) checkMode(recipeRscName *datamodel.Recipe) (datamodel.PipelineMode, error) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - srcConnResp, err := s.connectorServiceClient.GetSourceConnector(ctx, &connectorPB.GetSourceConnectorRequest{Name: srcConnRscName}) + srcConnRscName := recipeRscName.Source + dstConnRscName := recipeRscName.Destination + + srcConnResp, err := s.connectorServiceClient.GetSourceConnector(ctx, + &connectorPB.GetSourceConnectorRequest{ + Name: srcConnRscName, + }) if err != nil { - return datamodel.PipelineMode(pipelinePB.Pipeline_MODE_UNSPECIFIED), err + return datamodel.PipelineMode(pipelinePB.Pipeline_MODE_UNSPECIFIED), + status.Errorf(codes.Internal, "[connector-backend] Error %s at source-connectors/%s: %v", + "GetSourceConnector", srcConnRscName, err.Error()) + } - srcConnDefResp, err := s.connectorServiceClient.GetSourceConnectorDefinition(ctx, &connectorPB.GetSourceConnectorDefinitionRequest{Name: srcConnResp.GetSourceConnector().GetSourceConnectorDefinition()}) + srcConnDefResp, err := s.connectorServiceClient.GetSourceConnectorDefinition(ctx, + &connectorPB.GetSourceConnectorDefinitionRequest{ + Name: srcConnResp.GetSourceConnector().GetSourceConnectorDefinition(), + }) if err != nil { - return datamodel.PipelineMode(pipelinePB.Pipeline_MODE_UNSPECIFIED), err + return datamodel.PipelineMode(pipelinePB.Pipeline_MODE_UNSPECIFIED), + status.Errorf(codes.Internal, "[connector-backend] Error %s at source-connector-definitions/%s: %v", + "GetSourceConnectorDefinition", srcConnResp.GetSourceConnector().GetSourceConnectorDefinition(), err.Error()) } srcConnType := srcConnDefResp.GetSourceConnectorDefinition().GetConnectorDefinition().GetConnectionType() - dstConnResp, err := s.connectorServiceClient.GetDestinationConnector(ctx, &connectorPB.GetDestinationConnectorRequest{Name: dstConnRscName}) + dstConnResp, err := s.connectorServiceClient.GetDestinationConnector(ctx, + &connectorPB.GetDestinationConnectorRequest{ + Name: dstConnRscName, + }) if err != nil { - return datamodel.PipelineMode(pipelinePB.Pipeline_MODE_UNSPECIFIED), err + return datamodel.PipelineMode(pipelinePB.Pipeline_MODE_UNSPECIFIED), + status.Errorf(codes.Internal, "[connector-backend] Error %s at destination-connectors/%s: %v", + "GetDestinationConnector", dstConnRscName, err.Error()) } dstConnDefResp, err := s.connectorServiceClient.GetDestinationConnectorDefinition(ctx, - &connectorPB.GetDestinationConnectorDefinitionRequest{Name: dstConnResp.GetDestinationConnector().GetDestinationConnectorDefinition()}) + &connectorPB.GetDestinationConnectorDefinitionRequest{ + Name: dstConnResp.GetDestinationConnector().GetDestinationConnectorDefinition(), + }) if err != nil { - return datamodel.PipelineMode(pipelinePB.Pipeline_MODE_UNSPECIFIED), err + return datamodel.PipelineMode(pipelinePB.Pipeline_MODE_UNSPECIFIED), + status.Errorf(codes.Internal, "[connector-backend] Error %s at source-connector-definitions/%s: %v", + "GetDestinationConnectorDefinitionRequest", dstConnResp.GetDestinationConnector().GetDestinationConnectorDefinition(), err.Error()) } dstConnType := dstConnDefResp.GetDestinationConnectorDefinition().GetConnectorDefinition().GetConnectionType() @@ -45,8 +68,9 @@ func (s *service) getModeByConnRscName(srcConnRscName string, dstConnRscName str if srcConnType == connectorPB.ConnectionType_CONNECTION_TYPE_DIRECTNESS && dstConnType == connectorPB.ConnectionType_CONNECTION_TYPE_DIRECTNESS { - // Relying on a hardcoding naming rule "source-*" and "destination-*" for directness connectors - if strings.Split(srcConnDefResp.GetSourceConnectorDefinition().GetId(), "-")[1] == strings.Split(dstConnDefResp.GetDestinationConnectorDefinition().GetId(), "-")[1] { + // A hardcoding naming rule "source-*" and "destination-*" for directness connectors + if strings.Split(srcConnDefResp.GetSourceConnectorDefinition().GetId(), "-")[1] == + strings.Split(dstConnDefResp.GetDestinationConnectorDefinition().GetId(), "-")[1] { return datamodel.PipelineMode(pipelinePB.Pipeline_MODE_SYNC), nil } diff --git a/pkg/service/service.go b/pkg/service/service.go index 60bf392fd..e2436cef7 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -37,7 +37,6 @@ type Service interface { UpdatePipelineID(id string, ownerRscName string, newID string) (*datamodel.Pipeline, error) TriggerPipeline(req *pipelinePB.TriggerPipelineRequest, pipeline *datamodel.Pipeline) (*pipelinePB.TriggerPipelineResponse, error) TriggerPipelineBinaryFileUpload(fileBuf bytes.Buffer, fileLengths []uint64, pipeline *datamodel.Pipeline) (*pipelinePB.TriggerPipelineBinaryFileUploadResponse, error) - ValidatePipeline(pipeline *datamodel.Pipeline) error } type service struct { @@ -61,7 +60,7 @@ func NewService(r repository.Repository, mu mgmtPB.UserServiceClient, c connecto func (s *service) CreatePipeline(dbPipeline *datamodel.Pipeline) (*datamodel.Pipeline, error) { - mode, err := s.getModeByConnRscName(dbPipeline.Recipe.Source, dbPipeline.Recipe.Destination) + mode, err := s.checkMode(dbPipeline.Recipe) if err != nil { return nil, err } @@ -76,15 +75,22 @@ func (s *service) CreatePipeline(dbPipeline *datamodel.Pipeline) (*datamodel.Pip dbPipeline.Owner = ownerPermalink - if err := s.recipeNameToPermalink(dbPipeline.Recipe); err != nil { + recipeRscName := dbPipeline.Recipe + recipePermalink, err := s.recipeNameToPermalink(dbPipeline.Recipe) + if err != nil { return nil, status.Errorf(codes.InvalidArgument, err.Error()) } + dbPipeline.Recipe = recipePermalink + if dbPipeline.Mode == datamodel.PipelineMode(pipelinePB.Pipeline_MODE_SYNC) { dbPipeline.State = datamodel.PipelineState(pipelinePB.Pipeline_STATE_ACTIVE) } else { // TODO: Dispatch job to Temporal for periodical connection state check - dbPipeline.State = datamodel.PipelineState(pipelinePB.Pipeline_STATE_INACTIVE) + dbPipeline.State, err = s.checkState(recipeRscName) + if err != nil { + return nil, err + } } if err := s.repository.CreatePipeline(dbPipeline); err != nil { @@ -96,11 +102,8 @@ func (s *service) CreatePipeline(dbPipeline *datamodel.Pipeline) (*datamodel.Pip return nil, err } - if err := s.recipePermalinkToName(dbCreatedPipeline.Recipe); err != nil { - return nil, status.Errorf(codes.Internal, err.Error()) - } - dbCreatedPipeline.Owner = ownerRscName + dbCreatedPipeline.Recipe = recipeRscName return dbCreatedPipeline, nil } @@ -123,9 +126,11 @@ func (s *service) ListPipeline(ownerRscName string, pageSize int64, pageToken st if !isBasicView { for _, dbPipeline := range dbPipelines { - if err := s.recipePermalinkToName(dbPipeline.Recipe); err != nil { + recipeRscName, err := s.recipePermalinkToName(dbPipeline.Recipe) + if err != nil { return nil, 0, "", status.Errorf(codes.Internal, err.Error()) } + dbPipeline.Recipe = recipeRscName } } @@ -147,9 +152,11 @@ func (s *service) GetPipelineByID(id string, ownerRscName string, isBasicView bo dbPipeline.Owner = ownerRscName if !isBasicView { - if err := s.recipePermalinkToName(dbPipeline.Recipe); err != nil { + recipeRscName, err := s.recipePermalinkToName(dbPipeline.Recipe) + if err != nil { return nil, status.Errorf(codes.Internal, err.Error()) } + dbPipeline.Recipe = recipeRscName } return dbPipeline, nil @@ -170,9 +177,11 @@ func (s *service) GetPipelineByUID(uid uuid.UUID, ownerRscName string, isBasicVi dbPipeline.Owner = ownerRscName if !isBasicView { - if err := s.recipePermalinkToName(dbPipeline.Recipe); err != nil { + recipeRscName, err := s.recipePermalinkToName(dbPipeline.Recipe) + if err != nil { return nil, status.Errorf(codes.Internal, err.Error()) } + dbPipeline.Recipe = recipeRscName } return dbPipeline, nil @@ -201,11 +210,13 @@ func (s *service) UpdatePipeline(id string, ownerRscName string, toUpdPipeline * return nil, err } - if err := s.recipePermalinkToName(dbPipeline.Recipe); err != nil { + recipeRscName, err := s.recipePermalinkToName(dbPipeline.Recipe) + if err != nil { return nil, status.Errorf(codes.Internal, err.Error()) } dbPipeline.Owner = ownerRscName + dbPipeline.Recipe = recipeRscName return dbPipeline, nil } @@ -234,11 +245,14 @@ func (s *service) UpdatePipelineState(id string, ownerRscName string, state data return nil, err } - if err := s.recipePermalinkToName(dbPipeline.Recipe); err != nil { + recipeRscName, err := s.recipePermalinkToName(dbPipeline.Recipe) + if err != nil { return nil, status.Errorf(codes.Internal, err.Error()) } - mode, err := s.getModeByConnRscName(dbPipeline.Recipe.Source, dbPipeline.Recipe.Destination) + dbPipeline.Recipe = recipeRscName + + mode, err := s.checkMode(dbPipeline.Recipe) if err != nil { return nil, err } @@ -256,11 +270,8 @@ func (s *service) UpdatePipelineState(id string, ownerRscName string, state data return nil, err } - if err := s.recipePermalinkToName(dbPipeline.Recipe); err != nil { - return nil, status.Errorf(codes.Internal, err.Error()) - } - dbPipeline.Owner = ownerRscName + dbPipeline.Recipe = recipeRscName return dbPipeline, nil } @@ -286,32 +297,23 @@ func (s *service) UpdatePipelineID(id string, ownerRscName string, newID string) return nil, err } - if err := s.recipePermalinkToName(dbPipeline.Recipe); err != nil { + recipeRscName, err := s.recipePermalinkToName(dbPipeline.Recipe) + if err != nil { return nil, status.Errorf(codes.Internal, err.Error()) } dbPipeline.Owner = ownerRscName + dbPipeline.Recipe = recipeRscName return dbPipeline, nil } -func (s *service) ValidatePipeline(pipeline *datamodel.Pipeline) error { - - // Validation: Pipeline is in inactive state - if pipeline.State == datamodel.PipelineState(pipelinePB.Pipeline_STATE_INACTIVE) { - return status.Error(codes.FailedPrecondition, "This pipeline is inactivated") - } +func (s *service) TriggerPipeline(req *pipelinePB.TriggerPipelineRequest, dbPipeline *datamodel.Pipeline) (*pipelinePB.TriggerPipelineResponse, error) { - // Validation: Pipeline is in error state - if pipeline.State == datamodel.PipelineState(pipelinePB.Pipeline_STATE_ERROR) { - return status.Error(codes.FailedPrecondition, "This pipeline has errors") + if dbPipeline.State != datamodel.PipelineState(pipelinePB.Pipeline_STATE_ACTIVE) { + return nil, status.Error(codes.FailedPrecondition, fmt.Sprintf("The pipeline %s is not active", dbPipeline.ID)) } - return nil -} - -func (s *service) TriggerPipeline(req *pipelinePB.TriggerPipelineRequest, dbPipeline *datamodel.Pipeline) (*pipelinePB.TriggerPipelineResponse, error) { - ownerPermalink, err := s.ownerRscNameToPermalink(dbPipeline.Owner) if err != nil { return nil, status.Errorf(codes.InvalidArgument, err.Error()) @@ -336,19 +338,19 @@ func (s *service) TriggerPipeline(req *pipelinePB.TriggerPipelineRequest, dbPipe } } - var outputs []*structpb.Struct - for idx, modelInst := range dbPipeline.Recipe.ModelInstances { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() + var outputs []*structpb.Struct + for idx, modelInstRscName := range dbPipeline.Recipe.ModelInstances { // TODO: async call model-backend resp, err := s.modelServiceClient.TriggerModelInstance(ctx, &modelPB.TriggerModelInstanceRequest{ - Name: modelInst, + Name: modelInstRscName, Inputs: inputs, }) if err != nil { - return nil, status.Errorf(codes.Internal, "[model-backend] Error %s at %dth model instance %s: %v", "TriggerModel", idx, modelInst, err.Error()) + return nil, status.Errorf(codes.Internal, "[model-backend] Error %s at %dth model instance %s: %v", "TriggerModel", idx, modelInstRscName, err.Error()) } outputs = append(outputs, resp.Output) @@ -373,16 +375,38 @@ func (s *service) TriggerPipeline(req *pipelinePB.TriggerPipelineRequest, dbPipe }, nil // If this is a async trigger, write to the destination connector case dbPipeline.Mode == datamodel.PipelineMode(pipelinePB.Pipeline_MODE_ASYNC): - return nil, nil - // The default case should never been reached + + for idx, modelInstRecName := range dbPipeline.Recipe.ModelInstances { + + modelInstResp, err := s.modelServiceClient.GetModelInstance(ctx, &modelPB.GetModelInstanceRequest{ + Name: modelInstRecName, + }) + if err != nil { + return nil, status.Errorf(codes.Internal, "[model-backend] Error %s at %dth model instance %s: %v", "GetModelInstance", idx, modelInstRecName, err.Error()) + } + + _, err = s.connectorServiceClient.WriteDestinationConnector(ctx, &connectorPB.WriteDestinationConnectorRequest{ + Name: dbPipeline.Recipe.Destination, + Task: modelInstResp.Instance.GetTask(), + Data: outputs[idx], + }) + if err != nil { + return nil, status.Errorf(codes.Internal, "[connector-backend] Error %s at %dth model instance %s: %v", "WriteDestinationConnector", idx, modelInstRecName, err.Error()) + } + } + return &pipelinePB.TriggerPipelineResponse{}, nil default: - return nil, nil + return &pipelinePB.TriggerPipelineResponse{}, nil } } func (s *service) TriggerPipelineBinaryFileUpload(fileBuf bytes.Buffer, fileLengths []uint64, dbPipeline *datamodel.Pipeline) (*pipelinePB.TriggerPipelineBinaryFileUploadResponse, error) { + if dbPipeline.State != datamodel.PipelineState(pipelinePB.Pipeline_STATE_ACTIVE) { + return nil, status.Error(codes.FailedPrecondition, fmt.Sprintf("The pipeline %s is not active", dbPipeline.ID)) + } + ownerPermalink, err := s.ownerRscNameToPermalink(dbPipeline.Owner) if err != nil { return nil, status.Errorf(codes.InvalidArgument, err.Error()) @@ -394,7 +418,7 @@ func (s *service) TriggerPipelineBinaryFileUpload(fileBuf bytes.Buffer, fileLeng for idx, modelInst := range dbPipeline.Recipe.ModelInstances { // TODO: async call model-backend - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() stream, err := s.modelServiceClient.TriggerModelInstanceBinaryFileUpload(ctx) @@ -402,7 +426,7 @@ func (s *service) TriggerPipelineBinaryFileUpload(fileBuf bytes.Buffer, fileLeng _ = stream.CloseSend() }() if err != nil { - return nil, fmt.Errorf("[model-backend] Error %s at %dth model instance %s: cannot init stream: %v", "TriggerModelBinaryFileUpload", idx, modelInst, err.Error()) + return nil, status.Errorf(codes.Internal, "[model-backend] Error %s at %dth model instance %s: cannot init stream: %v", "TriggerModelBinaryFileUpload", idx, modelInst, err.Error()) } err = stream.Send(&modelPB.TriggerModelInstanceBinaryFileUploadRequest{ diff --git a/pkg/service/state.go b/pkg/service/state.go new file mode 100644 index 000000000..8b1aa53b8 --- /dev/null +++ b/pkg/service/state.go @@ -0,0 +1,80 @@ +package service + +import ( + "context" + "time" + + "github.com/gogo/status" + "google.golang.org/grpc/codes" + + "github.com/instill-ai/pipeline-backend/pkg/datamodel" + + connectorPB "github.com/instill-ai/protogen-go/vdp/connector/v1alpha" + modelPB "github.com/instill-ai/protogen-go/vdp/model/v1alpha" + pipelinePB "github.com/instill-ai/protogen-go/vdp/pipeline/v1alpha" +) + +func (s *service) checkState(recipeRscName *datamodel.Recipe) (datamodel.PipelineState, error) { + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + srcConnResp, err := s.connectorServiceClient.GetSourceConnector(ctx, &connectorPB.GetSourceConnectorRequest{ + Name: recipeRscName.Source, + }) + if err != nil { + return datamodel.PipelineState(pipelinePB.Pipeline_STATE_UNSPECIFIED), + status.Errorf(codes.Internal, "[connector-backend] Error %s at source-connectors/%s: %v", "GetDestinationConnector", recipeRscName.Source, err.Error()) + } + + srcConnState := int(srcConnResp.GetSourceConnector().GetConnector().GetState().Number()) + + dstConnResp, err := s.connectorServiceClient.GetDestinationConnector(ctx, &connectorPB.GetDestinationConnectorRequest{ + Name: recipeRscName.Destination, + }) + if err != nil { + return datamodel.PipelineState(pipelinePB.Pipeline_STATE_UNSPECIFIED), + status.Errorf(codes.Internal, "[connector-backend] Error %s at destination-connectors/%s: %v", "GetDestinationConnector", recipeRscName.Destination, err.Error()) + } + + dstConnState := int(dstConnResp.GetDestinationConnector().GetConnector().GetState().Number()) + + modelInstStates := make([]int, len(recipeRscName.ModelInstances)) + for idx, modelInst := range recipeRscName.ModelInstances { + modelInstResp, err := s.modelServiceClient.GetModelInstance(ctx, &modelPB.GetModelInstanceRequest{ + Name: modelInst, + }) + if err != nil { + return datamodel.PipelineState(pipelinePB.Pipeline_STATE_UNSPECIFIED), + status.Errorf(codes.Internal, "[model-backend] Error %s at %dth model instance %s: %v", "GetModelInstance", idx, modelInst, err.Error()) + } + modelInstStates[idx] = int(modelInstResp.Instance.State.Number()) + } + + // State precedence rule (i.e., enum_number state logic) : 3 error (any of) > 0 unspecified (any of) > 1 negative (any of) > 2 positive (all of) + states := []int{srcConnState, dstConnState} + states = append(states, modelInstStates...) + + if contains(states, 3) { + return datamodel.PipelineState(pipelinePB.Pipeline_STATE_ERROR), nil + } + + if contains(states, 0) { + return datamodel.PipelineState(pipelinePB.Pipeline_STATE_UNSPECIFIED), nil + } + + if contains(states, 1) { + return datamodel.PipelineState(pipelinePB.Pipeline_STATE_INACTIVE), nil + } + + return datamodel.PipelineState(pipelinePB.Pipeline_STATE_ACTIVE), nil +} + +func contains(slice interface{}, elem interface{}) bool { + for _, v := range slice.([]int) { + if v == elem { + return true + } + } + return false +}