Skip to content
This repository has been archived by the owner on Feb 14, 2024. It is now read-only.

Commit

Permalink
update to support latest protos with ordered pipelines (#82)
Browse files Browse the repository at this point in the history
* update to support latest protos with ordered pipelines

* remove unused attach and detach

* enable onTrue condition for wasm test to demonstrate wasm error

* commenting out passing step conditions in wasm request

* don't pass result conditions to wasm, handle them in the sdk

* fix data bug

* update exmaples

* fix static schema inference name checks

* don't skip infer schema pipeline

---------

Co-authored-by: Jacob Heric <[email protected]:q!>
Co-authored-by: Daniel Selans <[email protected]>
  • Loading branch information
3 people authored Feb 3, 2024
1 parent 1d97aa3 commit 9bffc6d
Show file tree
Hide file tree
Showing 15 changed files with 303 additions and 184 deletions.
6 changes: 4 additions & 2 deletions examples/commonjs/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { SDKResponse } from "@streamdal/node-sdk";

const {
ExecStatus,
SDKResponse,
OperationType,
Streamdal,
// eslint-disable-next-line @typescript-eslint/no-var-requires
Expand Down Expand Up @@ -55,8 +57,8 @@ export const example = async () => {
data: new TextEncoder().encode(JSON.stringify(exampleData)),
});

if (result.error) {
console.error("Pipeline error", result.errorMessage);
if (result.status === ExecStatus.ERROR) {
console.error("Pipeline error", result.statusMessage);
//
// Optionally explore more detailed step status information
console.dir(result.pipelineStatus);
Expand Down
5 changes: 3 additions & 2 deletions examples/esm/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {
Audience,
ExecStatus,
OperationType,
SDKResponse,
Streamdal,
Expand Down Expand Up @@ -55,8 +56,8 @@ export const example = async () => {
data: new TextEncoder().encode(JSON.stringify(exampleData)),
});

if (result.error) {
console.error("Pipeline error", result.errorMessage);
if (result.status === ExecStatus.ERROR) {
console.error("Pipeline error", result.statusMessage);
//
// Optionally explore more detailed step status information
console.dir(result.pipelineStatus);
Expand Down
14 changes: 7 additions & 7 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
"@grpc/proto-loader": "0.7.7",
"@protobuf-ts/grpc-transport": "2.9.0",
"@protobuf-ts/runtime-rpc": "2.9.0",
"@streamdal/protos": "^0.1.14",
"@streamdal/protos": "^0.1.17",
"@types/rwlock": "^5.0.3",
"@types/sinon": "^10.0.16",
"dotenv": "^16.3.1",
Expand Down
45 changes: 21 additions & 24 deletions src/__tests__/pipeline.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ import { audienceKey, internal } from "../internal/register.js";
const testConfigs = {
registered: true,
grpcClient: {
getAttachCommandsByService: () => ({
active: [],
paused: [],
getSetPipelinesCommandsByService: () => ({
setPipelineCommands: [],
}),
} as unknown as InternalClient,
sessionId: uuidv4(),
Expand All @@ -39,8 +38,7 @@ const testPipeline: Pipeline = {
steps: [
{
name: "test-step",
onSuccess: [],
onFailure: [],
dynamic: true,
step: {
oneofKind: "detective",
detective: { args: [], type: 1013, path: "object.field" },
Expand All @@ -51,48 +49,47 @@ const testPipeline: Pipeline = {
};
const testAttachCommand: Command = {
command: {
oneofKind: "attachPipeline",
attachPipeline: {
pipeline: testPipeline,
oneofKind: "setPipelines",
setPipelines: {
pipelines: [testPipeline],
},
},
audience: testAudience,
};

const testDetachCommand: Command = {
command: {
oneofKind: "detachPipeline",
detachPipeline: {
pipelineId: testPipeline.id,
oneofKind: "setPipelines",
setPipelines: {
pipelines: [],
},
},
audience: testAudience,
};

const testPauseCommand: Command = {
command: {
oneofKind: "pausePipeline",
pausePipeline: {
pipelineId: testPipeline.id,
oneofKind: "setPipelines",
setPipelines: {
pipelines: [...[{...testPipeline, Paused: true }]]
},
},
audience: testAudience,
};

const testResumeCommand: Command = {
command: {
oneofKind: "resumePipeline",
resumePipeline: {
pipelineId: testPipeline.id,
oneofKind: "setPipelines",
setPipelines: {
pipelines: [...[{...testPipeline, Paused: false }]]
},
},
audience: testAudience,
};

const testAttachCommandByServiceResponse = {
const testGetSetPipelinesCommandsByServiceResponse = {
response: {
active: [testAttachCommand],
paused: [],
setPipelineCommands : [testAttachCommand],
wasmModules: { test: { id: "test", bytes: new Uint8Array() } },
},
};
Expand All @@ -101,8 +98,8 @@ describe("pipeline tests", () => {
const key = audienceKey(testAudience);
it("initPipelines should add a given pipeline to internal store and set pipelineInitialized ", async () => {
sinon
.stub(testConfigs.grpcClient, "getAttachCommandsByService")
.resolves(testAttachCommandByServiceResponse as any);
.stub(testConfigs.grpcClient, "getSetPipelinesCommandsByService")
.resolves(testGetSetPipelinesCommandsByServiceResponse as any);

await initPipelines(testConfigs);

Expand Down Expand Up @@ -134,7 +131,7 @@ describe("pipeline tests", () => {

await processResponse(testPauseCommand);

expect(internal.pipelines.get(key)?.get(testPipeline.id)?.paused).toEqual(
expect(internal.pipelines.get(key)?.get(testPipeline.id)?.Paused).toEqual(
true
);
});
Expand All @@ -146,7 +143,7 @@ describe("pipeline tests", () => {
);
await processResponse(testResumeCommand);

expect(internal.pipelines.get(key)?.get(testPipeline.id)?.paused).toEqual(
expect(internal.pipelines.get(key)?.get(testPipeline.id)?.Paused).toEqual(
false
);
});
Expand Down
130 changes: 130 additions & 0 deletions src/__tests__/wasm.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import { Command } from "@streamdal/protos/protos/sp_command";
import { InternalClient } from "@streamdal/protos/protos/sp_internal.client";
import { Pipeline } from "@streamdal/protos/protos/sp_pipeline";
import * as fs from "fs";
import sinon from "sinon";
import { v4 as uuidv4 } from "uuid";
import { describe, expect, it } from "vitest";

import { initPipelines } from "../internal/pipeline.js";
import { processPipeline } from "../internal/process.js";
import { audienceKey, internal } from "../internal/register.js";
import { ExecStatus } from "../streamdal.js";

const testData = {
boolean_t: true,
boolean_f: false,
object: {
ipv4_address: "127.0.0.1",
ipv6_address: "2001:0db8:85a3:0000:0000:8a2e:0370:7334",
mac_address: "00-B0-D0-63-C2-26",
uuid_dash: "550e8400-e29b-41d4-a716-446655440000",
uuid_colon: "550e8400:e29b:41d4:a716:446655440000",
uuid_stripped: "550e8400e29b41d4a716446655440000",
number_as_string: "1234",
field: "value",
empty_string: "",
null_field: null,
empty_array: [],
email: "[email protected]"
},
array: ["value1", "value2"],
number_int: 100,
number_float: 100.1,
timestamp_unix_str: "1614556800",
timestamp_unix_num: 1614556800,
timestamp_unix_nano_str: "1614556800000000000",
timestamp_unix_nano_num: 1614556800000000000,
timestamp_rfc3339: "2023-06-29T12:34:56Z",
};

const testConfigs = {
registered: true,
grpcClient: {
getSetPipelinesCommandsByService: () => ({
setPipelineCommands: [],
}),
} as unknown as InternalClient,
sessionId: uuidv4(),
streamdalUrl: "localhost:8082",
streamdalToken: "1234",
serviceName: "test-service",
pipelineTimeout: "100",
stepTimeout: "10",
dryRun: false,
audiences: [],
};

const testAudience = {
serviceName: "test-service",
componentName: "kafka",
operationType: 1,
operationName: "kafka-consumer",
};

const testPipeline: Pipeline = {
id: "7a04056d-52ae-467e-ab43-2a82a2c90284",
name: "test-pipeline",
steps: [
{
name: "test-step",
dynamic: false,
step: {
oneofKind: "detective",
detective: { args: [], type: 1001, path: "object.field", negate: false },
},
WasmId: "testDetectiveWasm",
WasmFunction: "f",
},
],
NotificationConfigs: [],
};

const testAttachCommand: Command = {
command: {
oneofKind: "setPipelines",
setPipelines: {
pipelines: [testPipeline],
},
},
audience: testAudience,
};

const testGetSetPipelinesCommandsByServiceResponse = {
response: {
setPipelineCommands : [testAttachCommand],
wasmModules: { testDetectiveWasm: { id: "test", bytes: new Uint8Array() } },
},
};


describe("wasm tests", async () => {
const key = audienceKey(testAudience);
const wasm = fs.readFileSync("./src/__tests__/wasm/detective.wasm");

testGetSetPipelinesCommandsByServiceResponse.response.wasmModules.testDetectiveWasm.bytes =
wasm;

it("init attach command should load pipeline wasm into internal store", async () => {
sinon
.stub(testConfigs.grpcClient, "getSetPipelinesCommandsByService")
.resolves(testGetSetPipelinesCommandsByServiceResponse as any);

await initPipelines(testConfigs);

expect(internal.wasmModules.has("testDetectiveWasm")).toEqual(true);
});

it("attached pipeline detective step should execute wasm", async () => {
internal.pipelines.set(key, new Map([[testPipeline.id, testPipeline]]));
const result = processPipeline({
originalData: new TextEncoder().encode(JSON.stringify(testData)),
audience: testAudience,
configs: testConfigs,
pipeline: testPipeline,
});

expect(result?.pipelineStatus?.stepStatus?.at(-1)?.status).toEqual(ExecStatus.TRUE);
});

});
Binary file added src/__tests__/wasm/detective.wasm
Binary file not shown.
14 changes: 5 additions & 9 deletions src/internal/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@ import {
OperationType,
} from "@streamdal/protos/protos/sp_common";
import { IInternalClient } from "@streamdal/protos/protos/sp_internal.client";
import { StepStatus } from "@streamdal/protos/protos/sp_sdk";
import { Pipeline } from "@streamdal/protos/protos/sp_pipeline";
import { ExecStatus, StepStatus } from "@streamdal/protos/protos/sp_sdk";
import ReadWriteLock from "rwlock";

import { InternalPipeline } from "./pipeline.js";

export const METRIC_INTERVAL = 1000;

export const metrics = new Map<string, Metric>();
Expand All @@ -19,10 +18,7 @@ export interface MetricsConfigs {
streamdalToken: string;
}

export const getStepLabels = (
audience: Audience,
pipeline: InternalPipeline
) => ({
export const getStepLabels = (audience: Audience, pipeline: Pipeline) => ({
service: audience.serviceName,
component: audience.componentName,
operation: audience.operationName,
Expand All @@ -38,7 +34,7 @@ export const stepMetrics = async ({
}: {
audience: Audience;
stepStatus: StepStatus;
pipeline: InternalPipeline;
pipeline: Pipeline;
payloadSize: number;
// eslint-disable-next-line @typescript-eslint/require-await
}) => {
Expand All @@ -51,7 +47,7 @@ export const stepMetrics = async ({
const stepProcessedKey = `counter_${opName}_processed`;
const stepBytesKey = `counter_${opName}_bytes`;

stepStatus.error &&
stepStatus.status === ExecStatus.ERROR &&
metrics.set(stepErrorKey, {
name: stepErrorKey,
value: (metrics.get(stepErrorKey)?.value ?? 0) + 1,
Expand Down
Loading

0 comments on commit 9bffc6d

Please sign in to comment.