See https://docs.streamdal.com
go get github.com/streamdal/go-sdk
package main
import (
"context"
"fmt"
"github.com/streamdal/go-sdk"
)
func main() {
sc, _ := streamdal.New(&streamdal.Config{
// Address of the streamdal server
ServerURL: "streamdal-server.svc.cluster.local:8082",
// Token used for authenticating with the streamdal server
ServerToken: "1234",
// Identify _this_ application/service (
ServiceName: "billing-svc",
})
resp := sc.Process(context.Background(), &streamdal.ProcessRequest{
OperationType: streamdal.OperationTypeConsumer,
OperationName: "new-order-topic",
ComponentName: "kafka",
Data: []byte(`{"object": {"field": true}}`),
})
// Check if the .Process() call completed
if resp.Status != streamdal.StatusError {
fmt.Println("Successfully processed payload")
}
// Or you can inspect each individual pipeline & step result
for _, pipeline := resp.PipelineStatus {
fmt.Printf("Inspecting '%d' steps in pipeline '%s'...\n", len(resp.PipelineStatus), pipeline.Name)
for _, step := range pipeline.StepStatus {
fmt.Printf("Step '%s' status: '%s'\n", step.Name, step.Status)
}
}
}
All configuration can be passed via streamdal.Config{}
. Some values can be set via environment variables in
order to support 12-Factor and usage of this SDK inside shims where streamdal.Config{}
cannot be set.
Config Parameter | Environment Variable | Description | Default |
---|---|---|---|
ServerURL | STREAMDAL_URL | URL pointing to your instance of streamdal server's gRPC API. Ex: localhost:8082 | empty |
ServerToken | STREAMDAL_TOKEN | API token set in streamdal server | empty |
ServiceName | STREAMDAL_SERVICE_NAME | Identifies this service in the streamdal console | empty |
PipelineTimeout | STREAMDAL_PIPELINE_TIMEOUT | Maximum time a pipeline can run before giving up | 100ms |
StepTimeout | STREAMDAL_STEP_TIMEOUT | Maximum time a pipeline step can run before giving up | 10ms |
DryRun | STREAMDAL_DRY_RUN | If true, no data will be modified | false |
Logger | An optional custom logger | ||
ClientType | 1 = ClientTypeSDK, 2 = ClientTypeShim | ClientTypeSDK | |
ShutdownCtx | - | Your application's main context which will receive shutdown signals |
Metrics are published to Streamdal server and are available in Prometheus format at http://streamdal_server_url:8081/metrics
Metric | Description | Labels |
---|---|---|
streamdal_counter_consume_bytes |
Number of bytes consumed by the client | service , component_name , operation_name , pipeline_id , pipeline_name |
streamdal_counter_consume_errors |
Number of errors encountered while consuming payloads | service , component_name , operation_name , pipeline_id , pipeline_name |
streamdal_counter_consume_processed |
Number of payloads processed by the client | service , component_name , operation_name , pipeline_id , pipeline_name |
streamdal_counter_produce_bytes |
Number of bytes produced by the client | service , component_name , operation_name , pipeline_id , pipeline_name |
streamdal_counter_produce_errors |
Number of errors encountered while producing payloads | service , component_name , operation_name , pipeline_id , pipeline_name |
streamdal_counter_produce_processed |
Number of payloads processed by the client | service , component_name , operation_name , pipeline_id , pipeline_name |
streamdal_counter_notify |
Number of notifications sent to the server | service , component_name , operation_name , pipeline_id , pipeline_name |